diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 57dd6cd8c2c2..690af80e727b 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -131,7 +131,7 @@ class RowReaderOptions { // (in dwrf row reader). todo: encapsulate this and keySelectionCallBack_ in a // struct std::function blockedOnIoCallback_; - std::function decodingTimeMsCallback_; + std::function decodingTimeUsCallback_; std::function stripeCountCallback_; bool eagerFirstStripeLoad = true; uint64_t skipRows_ = 0; @@ -350,12 +350,12 @@ class RowReaderOptions { return blockedOnIoCallback_; } - void setDecodingTimeMsCallback(std::function decodingTimeMs) { - decodingTimeMsCallback_ = std::move(decodingTimeMs); + void setDecodingTimeUsCallback(std::function decodingTimeUs) { + decodingTimeUsCallback_ = std::move(decodingTimeUs); } - std::function getDecodingTimeMsCallback() const { - return decodingTimeMsCallback_; + std::function getDecodingTimeUsCallback() const { + return decodingTimeUsCallback_; } void setStripeCountCallback( diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index 5e2fcc603f41..e0d5accb1088 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -15,6 +15,9 @@ */ #include "velox/dwio/dwrf/reader/DwrfReader.h" + +#include + #include "velox/dwio/common/TypeUtils.h" #include "velox/dwio/common/exception/Exception.h" #include "velox/dwio/dwrf/reader/ColumnReader.h" @@ -35,7 +38,7 @@ DwrfRowReader::DwrfRowReader( : StripeReaderBase(reader), options_(opts), executor_{options_.getDecodingExecutor()}, - decodingTimeMsCallback_{options_.getDecodingTimeMsCallback()}, + decodingTimeUsCallback_{options_.getDecodingTimeUsCallback()}, stripeCountCallback_{options_.getStripeCountCallback()}, columnSelector_{std::make_shared( ColumnSelector::apply(opts.getSelector(), reader->getSchema()))} { @@ -274,17 +277,23 @@ void DwrfRowReader::readNext( const dwio::common::Mutation* mutation, VectorPtr& result) { if (!selectiveColumnReader_) { - const auto startTime = std::chrono::high_resolution_clock::now(); + std::optional startTime; + if (decodingTimeUsCallback_) { + // We'll use wall time since we have parallel decoding. + // If we move to sequential decoding only, we can use CPU time. + startTime.emplace(std::chrono::steady_clock::now()); + } // TODO: Move row number appending logic here. Currently this is done in // the wrapper reader. VELOX_CHECK( mutation == nullptr, "Mutation pushdown is only supported in selective reader"); columnReader_->next(rowsToRead, result); - if (decodingTimeMsCallback_) { - auto decodingTime = std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - startTime); - decodingTimeMsCallback_(decodingTime.count()); + if (startTime.has_value()) { + decodingTimeUsCallback_( + std::chrono::duration_cast( + std::chrono::steady_clock::now() - startTime.value()) + .count()); } return; } diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index b07e63403fe4..950f0829ba43 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -149,7 +149,7 @@ class DwrfRowReader : public StrideIndexProvider, std::shared_ptr stripeDictionaryCache_; dwio::common::RowReaderOptions options_; std::shared_ptr executor_; - std::function decodingTimeMsCallback_; + std::function decodingTimeUsCallback_; std::function stripeCountCallback_; struct PrefetchedStripeState {