From aa9ee2ea7661616497a35d17e3d4ed68d511d8ee Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Tue, 29 Oct 2024 17:12:25 -0700 Subject: [PATCH] Improve operator tracing and make it E2E work (#11360) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11360 This PR improves the operator trace framework and replay tool implementation: (1) Let driver execution framework to handle the trace input collection and summary file generation. The finish trace is either called when trace limit exceeds or on operator close instead of no more input. So it can capture more information in summary like peak memory usage. By removing the trace input collection into the driver framework it eases the trace input collection for a spilling operator. We add to capture the peak memory and input rows in summary so it helps identify the hot operator or task on replay debugging. (2) Change the trace storage layout to have root with query id for traces from different tasks (3) Abstract the replay tool function into TraceReplayRunner class and derive in Meta internal code repo to handle the Meta internal env setup and keep most common part in TraceReplayRunner for OSS (4) A couple of fixes in trace replay tool to make it E2E function in Meta for table writer use case (5) A couple of file/class renaming to make the file/class name to be more specific as currently we only support operator level trace collection and replay Reviewed By: tanjialiang Differential Revision: D64946367 --- velox/common/file/File.cpp | 5 +- velox/core/PlanNode.cpp | 6 +- velox/core/PlanNode.h | 17 +- velox/docs/develop/debugging/tracing.rst | 8 +- velox/exec/CMakeLists.txt | 15 +- velox/exec/Driver.cpp | 5 +- velox/exec/Driver.h | 4 +- velox/exec/HashAggregation.cpp | 1 - velox/exec/LocalPlanner.cpp | 11 +- velox/exec/Operator.cpp | 35 ++- velox/exec/Operator.h | 13 +- velox/exec/OperatorTraceReader.cpp | 79 +++++ ...ueryDataReader.h => OperatorTraceReader.h} | 33 +- ...eryTraceScan.cpp => OperatorTraceScan.cpp} | 31 +- .../{QueryTraceScan.h => OperatorTraceScan.h} | 11 +- ...DataWriter.cpp => OperatorTraceWriter.cpp} | 49 +-- ...ueryDataWriter.h => OperatorTraceWriter.h} | 25 +- velox/exec/PartitionedOutput.cpp | 2 - velox/exec/QueryDataReader.cpp | 59 ---- velox/exec/QueryTraceUtil.cpp | 111 ------- velox/exec/TableWriter.cpp | 2 +- velox/exec/Task.cpp | 55 ++-- velox/exec/Task.h | 12 +- ...MetadataReader.cpp => TaskTraceReader.cpp} | 26 +- ...ueryMetadataReader.h => TaskTraceReader.h} | 8 +- ...MetadataWriter.cpp => TaskTraceWriter.cpp} | 25 +- ...ueryMetadataWriter.h => TaskTraceWriter.h} | 7 +- velox/exec/Trace.cpp | 33 ++ velox/exec/{QueryTraceTraits.h => Trace.h} | 26 +- .../{QueryTraceConfig.cpp => TraceConfig.cpp} | 8 +- .../{QueryTraceConfig.h => TraceConfig.h} | 8 +- velox/exec/TraceUtil.cpp | 182 +++++++++++ velox/exec/{QueryTraceUtil.h => TraceUtil.h} | 79 ++++- velox/exec/tests/CMakeLists.txt | 3 +- ...eryTraceTest.cpp => OperatorTraceTest.cpp} | 287 ++++++++++-------- velox/exec/tests/TraceUtilTest.cpp | 188 ++++++++++++ velox/exec/tests/utils/OperatorTestBase.h | 2 - velox/exec/tests/utils/PlanBuilder.cpp | 5 +- velox/exec/tests/utils/PlanBuilder.h | 3 + velox/tool/trace/AggregationReplayer.cpp | 2 +- velox/tool/trace/AggregationReplayer.h | 6 +- velox/tool/trace/CMakeLists.txt | 2 +- velox/tool/trace/OperatorReplayerBase.cpp | 36 +-- velox/tool/trace/OperatorReplayerBase.h | 13 +- .../tool/trace/PartitionedOutputReplayer.cpp | 13 +- velox/tool/trace/PartitionedOutputReplayer.h | 3 +- velox/tool/trace/TableWriterReplayer.cpp | 2 +- velox/tool/trace/TableWriterReplayer.h | 11 +- ...ueryReplayer.cpp => TraceReplayRunner.cpp} | 258 ++++++++++------ velox/tool/trace/TraceReplayRunner.h | 54 ++++ velox/tool/trace/TraceReplayerMain.cpp | 28 ++ .../trace/tests/AggregationReplayerTest.cpp | 16 +- .../tests/PartitionedOutputReplayerTest.cpp | 16 +- .../trace/tests/TableWriterReplayerTest.cpp | 6 +- 54 files changed, 1288 insertions(+), 657 deletions(-) create mode 100644 velox/exec/OperatorTraceReader.cpp rename velox/exec/{QueryDataReader.h => OperatorTraceReader.h} (62%) rename velox/exec/{QueryTraceScan.cpp => OperatorTraceScan.cpp} (62%) rename velox/exec/{QueryTraceScan.h => OperatorTraceScan.h} (88%) rename velox/exec/{QueryDataWriter.cpp => OperatorTraceWriter.cpp} (61%) rename velox/exec/{QueryDataWriter.h => OperatorTraceWriter.h} (80%) delete mode 100644 velox/exec/QueryDataReader.cpp delete mode 100644 velox/exec/QueryTraceUtil.cpp rename velox/exec/{QueryMetadataReader.cpp => TaskTraceReader.cpp} (73%) rename velox/exec/{QueryMetadataReader.h => TaskTraceReader.h} (84%) rename velox/exec/{QueryMetadataWriter.cpp => TaskTraceWriter.cpp} (75%) rename velox/exec/{QueryMetadataWriter.h => TaskTraceWriter.h} (85%) create mode 100644 velox/exec/Trace.cpp rename velox/exec/{QueryTraceTraits.h => Trace.h} (62%) rename velox/exec/{QueryTraceConfig.cpp => TraceConfig.cpp} (88%) rename velox/exec/{QueryTraceConfig.h => TraceConfig.h} (92%) create mode 100644 velox/exec/TraceUtil.cpp rename velox/exec/{QueryTraceUtil.h => TraceUtil.h} (50%) rename velox/exec/tests/{QueryTraceTest.cpp => OperatorTraceTest.cpp} (68%) create mode 100644 velox/exec/tests/TraceUtilTest.cpp rename velox/tool/trace/{QueryReplayer.cpp => TraceReplayRunner.cpp} (57%) create mode 100644 velox/tool/trace/TraceReplayRunner.h create mode 100644 velox/tool/trace/TraceReplayerMain.cpp diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index 46156c2f1aa77..6a30f0a26159e 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -170,9 +170,10 @@ void LocalReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) VELOX_CHECK_EQ( bytesRead, length, - "fread failure in LocalReadFile::PReadInternal, {} vs {}.", + "fread failure in LocalReadFile::PReadInternal, {} vs {}: {}", bytesRead, - length); + length, + folly::errnoStr(errno)); } std::string_view diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 9e3c1fc0fad32..609ad7183513a 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -2355,15 +2355,15 @@ folly::dynamic PlanNode::serialize() const { return obj; } -const std::vector& QueryTraceScanNode::sources() const { +const std::vector& TraceScanNode::sources() const { return kEmptySources; } -std::string QueryTraceScanNode::traceDir() const { +std::string TraceScanNode::traceDir() const { return traceDir_; } -void QueryTraceScanNode::addDetails(std::stringstream& stream) const { +void TraceScanNode::addDetails(std::stringstream& stream) const { stream << "Trace dir: " << traceDir_; } diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 331f3618c2864..c893a834fda8e 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -314,13 +314,17 @@ class ArrowStreamNode : public PlanNode { std::shared_ptr arrowStream_; }; -class QueryTraceScanNode final : public PlanNode { +class TraceScanNode final : public PlanNode { public: - QueryTraceScanNode( + TraceScanNode( const PlanNodeId& id, const std::string& traceDir, + uint32_t pipelineId, const RowTypePtr& outputType) - : PlanNode(id), traceDir_(traceDir), outputType_(outputType) {} + : PlanNode(id), + traceDir_(traceDir), + pipelineId_(pipelineId), + outputType_(outputType) {} const RowTypePtr& outputType() const override { return outputType_; @@ -333,17 +337,22 @@ class QueryTraceScanNode final : public PlanNode { } folly::dynamic serialize() const override { - VELOX_UNSUPPORTED("QueryReplayScanNode is not serializable"); + VELOX_UNSUPPORTED("TraceScanNode is not serializable"); return nullptr; } std::string traceDir() const; + uint32_t pipelineId() const { + return pipelineId_; + } + private: void addDetails(std::stringstream& stream) const override; // Directory of traced data, which is $traceRoot/$taskId/$nodeId. const std::string traceDir_; + const uint32_t pipelineId_; const RowTypePtr outputType_; }; diff --git a/velox/docs/develop/debugging/tracing.rst b/velox/docs/develop/debugging/tracing.rst index 1750296503e12..0536546d2a6ef 100644 --- a/velox/docs/develop/debugging/tracing.rst +++ b/velox/docs/develop/debugging/tracing.rst @@ -57,7 +57,7 @@ The tracing framework consists of three components: Query Trace Writer ^^^^^^^^^^^^^^^^^^ -**QueryMetadataWriter** records the query metadata during task creation, +**TaskTraceMetadataWriter** records the query metadata during task creation, serializes, and writes them into a file in JSON format. There are two kinds of metadata: @@ -66,20 +66,20 @@ of metadata: - Plan fragment of the task (also known as a plan node tree). It can be serialized as a JSON object, which is already supported in Velox. -**QueryDataWriter** records the input vectors from the target operator, which are +**OperatorTraceWriter** records the input vectors from the target operator, which are serialized and written as a binary file. Query Trace Reader ^^^^^^^^^^^^^^^^^^ -**QueryMetadataReader** can load the query metadata JSON file, and extract the query +**TaskTraceMetadataReader** can load the query metadata JSON file, and extract the query configurations, connector properties, and the plan fragment. **QueryDataReader** can read and deserialize the input vectors of the target operator. It is used as the utility to replay the input data as a source operator in the target operator replay. -**NOTE**: `QueryDataWriter` serializes and flushes the input vectors in batches, +**NOTE**: `OperatorTraceWriter` serializes and flushes the input vectors in batches, allowing it to replay the input process using the same sequence of batches. Query Trace Util diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 9972f46b3ecb5..1b37277d70323 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -57,13 +57,14 @@ velox_add_library( OrderBy.cpp OutputBuffer.cpp OutputBufferManager.cpp - QueryDataReader.cpp - QueryDataWriter.cpp - QueryMetadataReader.cpp - QueryMetadataWriter.cpp - QueryTraceConfig.cpp - QueryTraceScan.cpp - QueryTraceUtil.cpp + OperatorTraceReader.cpp + OperatorTraceScan.cpp + OperatorTraceWriter.cpp + TaskTraceReader.cpp + TaskTraceWriter.cpp + Trace.h + TraceConfig.cpp + TraceUtil.cpp PartitionedOutput.cpp PartitionFunction.cpp PartitionStreamingWindowBuild.cpp diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 56614cf823fb1..27fe4275c5724 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -95,8 +95,8 @@ const core::QueryConfig& DriverCtx::queryConfig() const { return task->queryCtx()->queryConfig(); } -const std::optional& DriverCtx::traceConfig() const { - return task->queryTraceConfig(); +const std::optional& DriverCtx::traceConfig() const { + return task->traceConfig(); } velox::memory::MemoryPool* DriverCtx::addOperatorPool( @@ -618,6 +618,7 @@ StopReason Driver::runInternal( lockedStats->addInputVector( resultBytes, intermediateResult->size()); } + nextOp->traceInput(intermediateResult); TestValue::adjust( "facebook::velox::exec::Driver::runInternal::addInput", nextOp); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 2af297dfef112..1605d09e43164 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -27,7 +27,7 @@ #include "velox/common/time/CpuWallTimer.h" #include "velox/core/PlanFragment.h" #include "velox/core/QueryCtx.h" -#include "velox/exec/QueryTraceConfig.h" +#include "velox/exec/TraceConfig.h" namespace facebook::velox::exec { @@ -291,7 +291,7 @@ struct DriverCtx { const core::QueryConfig& queryConfig() const; - const std::optional& traceConfig() const; + const std::optional& traceConfig() const; velox::memory::MemoryPool* addOperatorPool( const core::PlanNodeId& planNodeId, diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index b15eab7bf441e..ef072d059d2c4 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -117,7 +117,6 @@ bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const { } void HashAggregation::addInput(RowVectorPtr input) { - traceInput(input); if (!pushdownChecked_) { mayPushdown_ = operatorCtx_->driver()->mayPushdownAggregation(this); pushdownChecked_ = true; diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index e75e489168a4e..aec3104be3a2c 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -32,9 +32,9 @@ #include "velox/exec/MergeJoin.h" #include "velox/exec/NestedLoopJoinBuild.h" #include "velox/exec/NestedLoopJoinProbe.h" +#include "velox/exec/OperatorTraceScan.h" #include "velox/exec/OrderBy.h" #include "velox/exec/PartitionedOutput.h" -#include "velox/exec/QueryTraceScan.h" #include "velox/exec/RowNumber.h" #include "velox/exec/StreamingAggregation.h" #include "velox/exec/TableScan.h" @@ -595,11 +595,10 @@ std::shared_ptr DriverFactory::createDriver( assignUniqueIdNode->taskUniqueId(), assignUniqueIdNode->uniqueIdCounter())); } else if ( - const auto queryReplayScanNode = - std::dynamic_pointer_cast( - planNode)) { - operators.push_back(std::make_unique( - id, ctx.get(), queryReplayScanNode)); + const auto traceScanNode = + std::dynamic_pointer_cast(planNode)) { + operators.push_back(std::make_unique( + id, ctx.get(), traceScanNode)); } else { std::unique_ptr extended; if (planNode->requiresExchangeClient()) { diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 910e4cec76b2e..13fb40acfcdb6 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -19,10 +19,8 @@ #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/testutil/TestValue.h" #include "velox/exec/Driver.h" -#include "velox/exec/HashJoinBridge.h" #include "velox/exec/OperatorUtils.h" -#include "velox/exec/QueryTraceUtil.h" -#include "velox/exec/Task.h" +#include "velox/exec/TraceUtil.h" #include "velox/expression/Expr.h" using facebook::velox::common::testutil::TestValue; @@ -107,8 +105,8 @@ void Operator::maybeSetReclaimer() { } void Operator::maybeSetTracer() { - const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig(); - if (!queryTraceConfig.has_value()) { + const auto& traceConfig = operatorCtx_->driverCtx()->traceConfig(); + if (!traceConfig.has_value()) { return; } @@ -117,7 +115,7 @@ void Operator::maybeSetTracer() { } const auto nodeId = planNodeId(); - if (queryTraceConfig->queryNodes.count(nodeId) == 0) { + if (traceConfig->queryNodes.count(nodeId) == 0) { return; } @@ -134,20 +132,17 @@ void Operator::maybeSetTracer() { const auto pipelineId = operatorCtx_->driverCtx()->pipelineId; const auto driverId = operatorCtx_->driverCtx()->driverId; - LOG(INFO) << "Trace data for operator type: " << operatorType() + LOG(INFO) << "Trace input for operator type: " << operatorType() << ", operator id: " << operatorId() << ", pipeline: " << pipelineId << ", driver: " << driverId << ", task: " << taskId(); - const auto opTraceDirPath = fmt::format( - "{}/{}/{}/{}/data", - queryTraceConfig->queryTraceDir, - planNodeId(), - pipelineId, - driverId); + const auto opTraceDirPath = trace::getOpTraceDirectory( + traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId); trace::createTraceDirectory(opTraceDirPath); - inputTracer_ = std::make_unique( + inputTracer_ = std::make_unique( + this, opTraceDirPath, memory::traceMemoryPool(), - queryTraceConfig->updateAndCheckTraceLimitCB); + traceConfig->updateAndCheckTraceLimitCB); } void Operator::traceInput(const RowVectorPtr& input) { @@ -319,6 +314,16 @@ OperatorStats Operator::stats(bool clear) { return stats; } +void Operator::close() { + input_ = nullptr; + results_.clear(); + recordSpillStats(); + finishTrace(); + + // Release the unused memory reservation on close. + operatorCtx_->pool()->release(); +} + vector_size_t Operator::outputBatchRows( std::optional averageRowSize) const { const auto& queryConfig = operatorCtx_->task()->queryCtx()->queryConfig(); diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 20561b3e788de..2138fa47e3ccd 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -21,7 +21,7 @@ #include "velox/core/PlanNode.h" #include "velox/exec/Driver.h" #include "velox/exec/JoinBridge.h" -#include "velox/exec/QueryDataWriter.h" +#include "velox/exec/OperatorTraceWriter.h" #include "velox/exec/Spiller.h" #include "velox/type/Filter.h" @@ -408,7 +408,6 @@ class Operator : public BaseRuntimeStatWriter { /// e.g. the first operator in the pipeline. virtual void noMoreInput() { noMoreInput_ = true; - finishTrace(); } /// Returns a RowVector with the result columns. Returns nullptr if @@ -483,13 +482,7 @@ class Operator : public BaseRuntimeStatWriter { /// Frees all resources associated with 'this'. No other methods /// should be called after this. - virtual void close() { - input_ = nullptr; - results_.clear(); - recordSpillStats(); - // Release the unused memory reservation on close. - operatorCtx_->pool()->release(); - } + virtual void close(); // Returns true if 'this' never has more output rows than input rows. virtual bool isFilter() const { @@ -781,7 +774,7 @@ class Operator : public BaseRuntimeStatWriter { folly::Synchronized stats_; folly::Synchronized spillStats_; - std::unique_ptr inputTracer_; + std::unique_ptr inputTracer_; /// Indicates if an operator is under a non-reclaimable execution section. /// This prevents the memory arbitrator from reclaiming memory from this diff --git a/velox/exec/OperatorTraceReader.cpp b/velox/exec/OperatorTraceReader.cpp new file mode 100644 index 0000000000000..047d98810758f --- /dev/null +++ b/velox/exec/OperatorTraceReader.cpp @@ -0,0 +1,79 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/exec/OperatorTraceReader.h" + +#include "velox/exec/TraceUtil.h" + +namespace facebook::velox::exec::trace { + +OperatorTraceInputReader::OperatorTraceInputReader( + std::string traceDir, + RowTypePtr dataType, + memory::MemoryPool* pool) + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), + dataType_(std::move(dataType)), + pool_(pool), + inputStream_(getInputStream()) { + VELOX_CHECK_NOT_NULL(dataType_); + VELOX_CHECK_NOT_NULL(inputStream_); +} + +bool OperatorTraceInputReader::read(RowVectorPtr& batch) const { + if (inputStream_->atEnd()) { + batch = nullptr; + return false; + } + + VectorStreamGroup::read( + inputStream_.get(), pool_, dataType_, &batch, &readOptions_); + return true; +} + +std::unique_ptr +OperatorTraceInputReader::getInputStream() const { + auto traceFile = fs_->openFileForRead(getOpTraceInputFilePath(traceDir_)); + // TODO: Make the buffer size configurable. + return std::make_unique( + std::move(traceFile), 1 << 20, pool_); +} + +OperatorTraceSummaryReader::OperatorTraceSummaryReader( + std::string traceDir, + memory::MemoryPool* pool) + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), + pool_(pool), + summaryFile_(fs_->openFileForRead(getOpTraceSummaryFilePath(traceDir_))) { +} + +OperatorTraceSummary OperatorTraceSummaryReader::read() const { + VELOX_CHECK_NOT_NULL(summaryFile_); + const auto summaryStr = summaryFile_->pread(0, summaryFile_->size()); + VELOX_CHECK(!summaryStr.empty()); + + folly::dynamic summaryObj = folly::parseJson(summaryStr); + OperatorTraceSummary summary; + summary.exceededTraceLimit = + summaryObj[OperatorTraceTraits::kTraceLimitExceededKey].asBool(); + summary.peakMemory = summaryObj[OperatorTraceTraits::kPeakMemoryKey].asInt(); + summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt(); + return summary; +} +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryDataReader.h b/velox/exec/OperatorTraceReader.h similarity index 62% rename from velox/exec/QueryDataReader.h rename to velox/exec/OperatorTraceReader.h index b5e6d24e011aa..2236d73fe2f7c 100644 --- a/velox/exec/QueryDataReader.h +++ b/velox/exec/OperatorTraceReader.h @@ -18,16 +18,16 @@ #include "velox/common/file/FileInputStream.h" #include "velox/common/file/FileSystems.h" -#include "velox/core/PlanNode.h" -#include "velox/core/QueryCtx.h" +#include "velox/exec/Trace.h" #include "velox/serializers/PrestoSerializer.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::exec::trace { -class QueryDataReader { +/// Used to read an operator trace input. +class OperatorTraceInputReader { public: - explicit QueryDataReader( + /// 'traceDir' specifies the operator trace directory. + OperatorTraceInputReader( std::string traceDir, RowTypePtr dataType, memory::MemoryPool* pool); @@ -37,16 +37,33 @@ class QueryDataReader { bool read(RowVectorPtr& batch) const; private: - std::unique_ptr getDataInputStream() const; + std::unique_ptr getInputStream() const; const std::string traceDir_; const serializer::presto::PrestoVectorSerde::PrestoOptions readOptions_{ true, common::CompressionKind_ZSTD, // TODO: Use trace config. - /*nullsFirst=*/true}; + /*_nullsFirst=*/true}; const std::shared_ptr fs_; const RowTypePtr dataType_; memory::MemoryPool* const pool_; - const std::unique_ptr dataStream_; + const std::unique_ptr inputStream_; +}; + +/// Used to read an operator trace summary. +class OperatorTraceSummaryReader { + public: + /// 'traceDir' specifies the operator trace directory. + OperatorTraceSummaryReader(std::string traceDir, memory::MemoryPool* pool); + + /// Read and return the operator trace 'summary'. The function throws if it + /// fails. + OperatorTraceSummary read() const; + + private: + const std::string traceDir_; + const std::shared_ptr fs_; + memory::MemoryPool* const pool_; + const std::unique_ptr summaryFile_; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceScan.cpp b/velox/exec/OperatorTraceScan.cpp similarity index 62% rename from velox/exec/QueryTraceScan.cpp rename to velox/exec/OperatorTraceScan.cpp index 5cc4f0b5c2230..a976367afa748 100644 --- a/velox/exec/QueryTraceScan.cpp +++ b/velox/exec/OperatorTraceScan.cpp @@ -14,33 +14,32 @@ * limitations under the License. */ -#include "velox/exec/QueryTraceScan.h" +#include "velox/exec/OperatorTraceScan.h" -#include "QueryTraceUtil.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryTraceScan::QueryTraceScan( +OperatorTraceScan::OperatorTraceScan( int32_t operatorId, DriverCtx* driverCtx, - const std::shared_ptr& queryTraceScanNode) + const std::shared_ptr& traceScanNode) : SourceOperator( driverCtx, - queryTraceScanNode->outputType(), + traceScanNode->outputType(), operatorId, - queryTraceScanNode->id(), - "QueryReplayScan") { - const auto dataDir = getDataDir( - queryTraceScanNode->traceDir(), - driverCtx->pipelineId, - driverCtx->driverId); - traceReader_ = std::make_unique( - dataDir, - queryTraceScanNode->outputType(), + traceScanNode->id(), + "OperatorTraceScan") { + traceReader_ = std::make_unique( + getOpTraceDirectory( + traceScanNode->traceDir(), + traceScanNode->pipelineId(), + driverCtx->driverId), + traceScanNode->outputType(), memory::MemoryManager::getInstance()->tracePool()); } -RowVectorPtr QueryTraceScan::getOutput() { +RowVectorPtr OperatorTraceScan::getOutput() { RowVectorPtr batch; if (traceReader_->read(batch)) { return batch; @@ -49,7 +48,7 @@ RowVectorPtr QueryTraceScan::getOutput() { return nullptr; } -bool QueryTraceScan::isFinished() { +bool OperatorTraceScan::isFinished() { return finished_; } diff --git a/velox/exec/QueryTraceScan.h b/velox/exec/OperatorTraceScan.h similarity index 88% rename from velox/exec/QueryTraceScan.h rename to velox/exec/OperatorTraceScan.h index 8bff5bca486d8..1542e43f2c414 100644 --- a/velox/exec/QueryTraceScan.h +++ b/velox/exec/OperatorTraceScan.h @@ -18,7 +18,7 @@ #include "velox/core/PlanNode.h" #include "velox/exec/Operator.h" -#include "velox/exec/QueryDataReader.h" +#include "velox/exec/OperatorTraceReader.h" namespace facebook::velox::exec::trace { /// This is a scan operator for query replay. It uses traced data from a @@ -38,13 +38,12 @@ namespace facebook::velox::exec::trace { /// It can be found from the QueryReplayScanNode. However the pipeline ID and /// driver ID are only known during operator creation, so we need to figure out /// the input traced data file and the output type dynamically. -class QueryTraceScan final : public SourceOperator { +class OperatorTraceScan final : public SourceOperator { public: - QueryTraceScan( + OperatorTraceScan( int32_t operatorId, DriverCtx* driverCtx, - const std::shared_ptr& - queryTraceScanNode); + const std::shared_ptr& traceScanNode); RowVectorPtr getOutput() override; @@ -55,7 +54,7 @@ class QueryTraceScan final : public SourceOperator { bool isFinished() override; private: - std::unique_ptr traceReader_; + std::unique_ptr traceReader_; bool finished_{false}; }; diff --git a/velox/exec/QueryDataWriter.cpp b/velox/exec/OperatorTraceWriter.cpp similarity index 61% rename from velox/exec/QueryDataWriter.cpp rename to velox/exec/OperatorTraceWriter.cpp index 7ab4e6d02ec0d..f082e199891b3 100644 --- a/velox/exec/QueryDataWriter.cpp +++ b/velox/exec/OperatorTraceWriter.cpp @@ -14,31 +14,32 @@ * limitations under the License. */ -#include "velox/exec/QueryDataWriter.h" +#include "velox/exec/OperatorTraceWriter.h" #include -#include "velox/common/base/SpillStats.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" -#include "velox/exec/QueryTraceTraits.h" -#include "velox/serializers/PrestoSerializer.h" +#include "velox/exec/Operator.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryDataWriter::QueryDataWriter( - std::string path, +OperatorTraceWriter::OperatorTraceWriter( + Operator* traceOp, + std::string traceDir, memory::MemoryPool* pool, UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB) - : dirPath_(std::move(path)), - fs_(filesystems::getFileSystem(dirPath_, nullptr)), + : traceOp_(traceOp), + traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), pool_(pool), updateAndCheckTraceLimitCB_(std::move(updateAndCheckTraceLimitCB)) { - dataFile_ = fs_->openFileForWrite( - fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataFileName)); - VELOX_CHECK_NOT_NULL(dataFile_); + traceFile_ = fs_->openFileForWrite(getOpTraceInputFilePath(traceDir_)); + VELOX_CHECK_NOT_NULL(traceFile_); } -void QueryDataWriter::write(const RowVectorPtr& rows) { +void OperatorTraceWriter::write(const RowVectorPtr& rows) { if (FOLLY_UNLIKELY(finished_)) { return; } @@ -66,32 +67,36 @@ void QueryDataWriter::write(const RowVectorPtr& rows) { finish(true); return; } - dataFile_->append(std::move(iobuf)); + traceFile_->append(std::move(iobuf)); } -void QueryDataWriter::finish(bool limitExceeded) { +void OperatorTraceWriter::finish(bool limitExceeded) { if (finished_) { return; } VELOX_CHECK_NOT_NULL( - dataFile_, "The query data writer has already been finished"); - dataFile_->close(); - dataFile_.reset(); + traceFile_, "The query data writer has already been finished"); + traceFile_->close(); + traceFile_.reset(); batch_.reset(); writeSummary(limitExceeded); finished_ = true; } -void QueryDataWriter::writeSummary(bool limitExceeded) const { - const auto summaryFilePath = - fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataSummaryFileName); +void OperatorTraceWriter::writeSummary(bool limitExceeded) const { + const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; if (dataType_ != nullptr) { - obj[QueryTraceTraits::kDataTypeKey] = dataType_->serialize(); + obj[TraceTraits::kDataTypeKey] = dataType_->serialize(); } - obj[QueryTraceTraits::kTraceLimitExceededKey] = limitExceeded; + obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType(); + obj[OperatorTraceTraits::kTraceLimitExceededKey] = limitExceeded; + const auto stats = traceOp_->stats(/*clear=*/false); + obj[OperatorTraceTraits::kPeakMemoryKey] = + stats.memoryStats.peakTotalMemoryReservation; + obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions; file->append(folly::toJson(obj)); file->close(); } diff --git a/velox/exec/QueryDataWriter.h b/velox/exec/OperatorTraceWriter.h similarity index 80% rename from velox/exec/QueryDataWriter.h rename to velox/exec/OperatorTraceWriter.h index 8e6073dcd3b77..0e5a80e22c3fe 100644 --- a/velox/exec/QueryDataWriter.h +++ b/velox/exec/OperatorTraceWriter.h @@ -16,21 +16,27 @@ #pragma once -#include "QueryTraceConfig.h" +#include "TraceConfig.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" -#include "velox/exec/QueryTraceTraits.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/VectorStream.h" +namespace facebook::velox::exec { +class Operator; +} + namespace facebook::velox::exec::trace { /// Used to serialize and write the input vectors from a given operator into a /// file. -class QueryDataWriter { +class OperatorTraceWriter { public: - explicit QueryDataWriter( - std::string path, + /// 'traceOp' is the operator to trace. 'traceDir' specifies the trace + /// directory for the operator. + explicit OperatorTraceWriter( + Operator* traceOp, + std::string traceDir, memory::MemoryPool* pool, UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB); @@ -40,7 +46,7 @@ class QueryDataWriter { /// Closes the data file and writes out the data summary. /// /// @param limitExceeded A flag indicates the written data bytes exceed the - /// limit causing the 'QueryDataWriter' to finish early. + /// limit causing the 'OperatorTraceWriter' to finish early. void finish(bool limitExceeded = false); private: @@ -49,7 +55,8 @@ class QueryDataWriter { // TODO: add more summaries such as number of rows etc. void writeSummary(bool limitExceeded = false) const; - const std::string dirPath_; + Operator* const traceOp_; + const std::string traceDir_; // TODO: make 'useLosslessTimestamp' configuerable. const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = { true, @@ -58,9 +65,11 @@ class QueryDataWriter { const std::shared_ptr fs_; memory::MemoryPool* const pool_; const UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB_; - std::unique_ptr dataFile_; + + std::unique_ptr traceFile_; TypePtr dataType_; std::unique_ptr batch_; + bool limitExceeded_{false}; bool finished_{false}; }; diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index cabff06611466..6b49fe22efa8e 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -233,8 +233,6 @@ void PartitionedOutput::estimateRowSizes() { } void PartitionedOutput::addInput(RowVectorPtr input) { - traceInput(input); - initializeInput(std::move(input)); initializeDestinations(); initializeSizeBuffers(); diff --git a/velox/exec/QueryDataReader.cpp b/velox/exec/QueryDataReader.cpp deleted file mode 100644 index ba08205dd8f25..0000000000000 --- a/velox/exec/QueryDataReader.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include "velox/exec/QueryDataReader.h" - -#include "velox/common/file/File.h" -#include "velox/exec/QueryTraceTraits.h" - -namespace facebook::velox::exec::trace { - -QueryDataReader::QueryDataReader( - std::string traceDir, - RowTypePtr dataType, - memory::MemoryPool* pool) - : traceDir_(std::move(traceDir)), - fs_(filesystems::getFileSystem(traceDir_, nullptr)), - dataType_(std::move(dataType)), - pool_(pool), - dataStream_(getDataInputStream()) { - VELOX_CHECK_NOT_NULL(dataType_); - VELOX_CHECK_NOT_NULL(dataStream_); -} - -bool QueryDataReader::read(RowVectorPtr& batch) const { - if (dataStream_->atEnd()) { - batch = nullptr; - return false; - } - - VectorStreamGroup::read( - dataStream_.get(), pool_, dataType_, &batch, &readOptions_); - return true; -} - -std::unique_ptr QueryDataReader::getDataInputStream() - const { - auto dataFile = fs_->openFileForRead( - fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataFileName)); - // TODO: Make the buffer size configurable. - return std::make_unique( - std::move(dataFile), 1 << 20, pool_); -} - -} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceUtil.cpp b/velox/exec/QueryTraceUtil.cpp deleted file mode 100644 index f3a339eec080a..0000000000000 --- a/velox/exec/QueryTraceUtil.cpp +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/exec/QueryTraceUtil.h" - -#include - -#include "velox/common/base/Exceptions.h" -#include "velox/common/file/File.h" -#include "velox/common/file/FileSystems.h" -#include "velox/exec/QueryTraceTraits.h" - -namespace facebook::velox::exec::trace { - -void createTraceDirectory(const std::string& traceDir) { - try { - const auto fs = filesystems::getFileSystem(traceDir, nullptr); - if (fs->exists(traceDir)) { - fs->rmdir(traceDir); - } - fs->mkdir(traceDir); - } catch (const std::exception& e) { - VELOX_FAIL( - "Failed to create trace directory '{}' with error: {}", - traceDir, - e.what()); - } -} - -std::vector getTaskIds( - const std::string& traceDir, - const std::shared_ptr& fs) { - VELOX_USER_CHECK(fs->exists(traceDir), "{} dose not exist", traceDir); - try { - const auto taskDirs = fs->list(traceDir); - std::vector taskIds; - for (const auto& taskDir : taskDirs) { - std::vector pathNodes; - folly::split("/", taskDir, pathNodes); - taskIds.emplace_back(std::move(pathNodes.back())); - } - return taskIds; - } catch (const std::exception& e) { - VELOX_FAIL( - "Failed to list the directory '{}' with error: {}", traceDir, e.what()); - } -} - -folly::dynamic getMetadata( - const std::string& metadataFile, - const std::shared_ptr& fs) { - try { - const auto file = fs->openFileForRead(metadataFile); - VELOX_CHECK_NOT_NULL(file); - const auto metadata = file->pread(0, file->size()); - VELOX_USER_CHECK(!metadata.empty()); - return folly::parseJson(metadata); - } catch (const std::exception& e) { - VELOX_FAIL( - "Failed to get the query metadata from '{}' with error: {}", - metadataFile, - e.what()); - } -} - -RowTypePtr getDataType( - const core::PlanNodePtr& tracedPlan, - const std::string& tracedNodeId, - size_t sourceIndex) { - const auto* traceNode = core::PlanNode::findFirstNode( - tracedPlan.get(), [&tracedNodeId](const core::PlanNode* node) { - return node->id() == tracedNodeId; - }); - VELOX_CHECK_NOT_NULL( - traceNode, - "traced node id {} not found in the traced plan", - tracedNodeId); - return traceNode->sources().at(sourceIndex)->outputType(); -} - -uint8_t getNumDrivers( - const std::string& rootDir, - const std::string& taskId, - const std::string& nodeId, - int32_t pipelineId, - const std::shared_ptr& fs) { - const auto traceDir = - fmt::format("{}/{}/{}/{}", rootDir, taskId, nodeId, pipelineId); - const auto driverDirs = fs->list(traceDir); - return driverDirs.size(); -} - -std::string -getDataDir(const std::string& traceDir, int pipelineId, int driverId) { - return fmt::format("{}/{}/{}/data", traceDir, pipelineId, driverId); -} - -} // namespace facebook::velox::exec::trace diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 3d289621f38a0..4899e4b4ebf89 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -126,7 +126,7 @@ void TableWriter::addInput(RowVectorPtr input) { if (input->size() == 0) { return; } - traceInput(input); + std::vector mappedChildren; mappedChildren.reserve(inputMapping_.size()); for (const auto i : inputMapping_) { diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 22e68d6c475d1..fb258c7c87926 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -30,8 +30,8 @@ #include "velox/exec/NestedLoopJoinBuild.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/OutputBufferManager.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/Task.h" +#include "velox/exec/TraceUtil.h" using facebook::velox::common::testutil::TestValue; @@ -304,7 +304,7 @@ Task::Task( dynamic_cast(queryCtx_->executor())); } - maybeInitQueryTrace(); + maybeInitTrace(); } Task::~Task() { @@ -2888,7 +2888,7 @@ std::shared_ptr Task::getExchangeClientLocked( return exchangeClients_[pipelineId]; } -std::optional Task::maybeMakeTraceConfig() const { +std::optional Task::maybeMakeTraceConfig() const { const auto& queryConfig = queryCtx_->queryConfig(); if (!queryConfig.queryTraceEnabled()) { return std::nullopt; @@ -2906,41 +2906,60 @@ std::optional Task::maybeMakeTraceConfig() const { return std::nullopt; } - const auto traceDir = - fmt::format("{}/{}", queryConfig.queryTraceDir(), taskId_); - const auto queryTraceNodes = queryConfig.queryTraceNodeIds(); - if (queryTraceNodes.empty()) { + const auto traceDir = trace::getTaskTraceDirectory( + queryConfig.queryTraceDir(), queryCtx_->queryId(), taskId_); + const auto traceNodes = queryConfig.queryTraceNodeIds(); + if (traceNodes.empty()) { LOG(INFO) << "Trace metadata for task: " << taskId_; - return trace::QueryTraceConfig(traceDir); + return trace::TraceConfig(traceDir); + } + + std::vector traceNodeIds; + folly::split(',', traceNodes, traceNodeIds); + std::unordered_set traceNodeIdSet( + traceNodeIds.begin(), traceNodeIds.end()); + VELOX_CHECK_EQ(traceNodeIdSet.size(), traceNodeIds.size()); + + bool foundTraceNode{false}; + for (const auto& traceNodeId : traceNodeIds) { + if (core::PlanNode::findFirstNode( + planFragment_.planNode.get(), + [traceNodeId](const core::PlanNode* node) -> bool { + return node->id() == traceNodeId; + })) { + foundTraceNode = true; + break; + } + } + if (!foundTraceNode) { + LOG(WARNING) << "Trace plan nodes not found for task " << taskId_ << ": " + << folly::join(",", traceNodeIdSet); + return std::nullopt; } - std::vector nodes; - folly::split(',', queryTraceNodes, nodes); - std::unordered_set nodeSet(nodes.begin(), nodes.end()); - VELOX_CHECK_EQ(nodeSet.size(), nodes.size()); LOG(INFO) << "Trace data for task " << taskId_ << " with plan nodes " - << queryTraceNodes; + << traceNodes; trace::UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB = [this](uint64_t bytes) { return queryCtx_->updateTracedBytesAndCheckLimit(bytes); }; - return trace::QueryTraceConfig( - std::move(nodeSet), + return trace::TraceConfig( + std::move(traceNodeIdSet), traceDir, std::move(updateAndCheckTraceLimitCB), queryConfig.queryTraceTaskRegExp()); } -void Task::maybeInitQueryTrace() { +void Task::maybeInitTrace() { if (!traceConfig_) { return; } trace::createTraceDirectory(traceConfig_->queryTraceDir); - const auto queryMetadatWriter = std::make_unique( + const auto metadataWriter = std::make_unique( traceConfig_->queryTraceDir, memory::traceMemoryPool()); - queryMetadatWriter->write(queryCtx_, planFragment_.planNode); + metadataWriter->write(queryCtx_, planFragment_.planNode); } void Task::testingVisitDrivers(const std::function& callback) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 526ab710400cb..7f421952ce51c 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -21,11 +21,11 @@ #include "velox/exec/LocalPartition.h" #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/MergeSource.h" -#include "velox/exec/QueryMetadataWriter.h" -#include "velox/exec/QueryTraceConfig.h" #include "velox/exec/Split.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" +#include "velox/exec/TaskTraceWriter.h" +#include "velox/exec/TraceConfig.h" #include "velox/vector/ComplexVector.h" namespace facebook::velox::exec { @@ -139,7 +139,7 @@ class Task : public std::enable_shared_from_this { } /// Returns query trace config if specified. - const std::optional& queryTraceConfig() const { + const std::optional& traceConfig() const { return traceConfig_; } @@ -980,11 +980,11 @@ class Task : public std::enable_shared_from_this { int32_t pipelineId) const; // Builds the query trace config. - std::optional maybeMakeTraceConfig() const; + std::optional maybeMakeTraceConfig() const; // Create a 'QueryMetadtaWriter' to trace the query metadata if the query // trace enabled. - void maybeInitQueryTrace(); + void maybeInitTrace(); // Universally unique identifier of the task. Used to identify the task when // calling TaskListener. @@ -1004,7 +1004,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment_; - const std::optional traceConfig_; + const std::optional traceConfig_; // Hook in the system wide task list. TaskListEntry taskListEntry_; diff --git a/velox/exec/QueryMetadataReader.cpp b/velox/exec/TaskTraceReader.cpp similarity index 73% rename from velox/exec/QueryMetadataReader.cpp rename to velox/exec/TaskTraceReader.cpp index f9fdc9ec2e80f..6ed295190f061 100644 --- a/velox/exec/QueryMetadataReader.cpp +++ b/velox/exec/TaskTraceReader.cpp @@ -14,44 +14,40 @@ * limitations under the License. */ -#include "velox/exec/QueryMetadataReader.h" +#include "velox/exec/TaskTraceReader.h" -#include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" -#include "velox/exec/QueryTraceTraits.h" -#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryMetadataReader::QueryMetadataReader( +TaskTraceMetadataReader::TaskTraceMetadataReader( std::string traceDir, memory::MemoryPool* pool) : traceDir_(std::move(traceDir)), fs_(filesystems::getFileSystem(traceDir_, nullptr)), - metaFilePath_(fmt::format( - "{}/{}", - traceDir_, - QueryTraceTraits::kQueryMetaFileName)), + traceFilePath_(getTaskTraceMetaFilePath(traceDir_)), pool_(pool) { VELOX_CHECK_NOT_NULL(fs_); - VELOX_CHECK(fs_->exists(metaFilePath_)); + VELOX_CHECK(fs_->exists(traceFilePath_)); } -void QueryMetadataReader::read( +void TaskTraceMetadataReader::read( std::unordered_map& queryConfigs, std::unordered_map< std::string, std::unordered_map>& connectorProperties, core::PlanNodePtr& queryPlan) const { - folly::dynamic metaObj = getMetadata(metaFilePath_, fs_); - const auto& queryConfigObj = metaObj[QueryTraceTraits::kQueryConfigKey]; + folly::dynamic metaObj = getTaskMetadata(traceFilePath_, fs_); + const auto& queryConfigObj = metaObj[TraceTraits::kQueryConfigKey]; for (const auto& [key, value] : queryConfigObj.items()) { queryConfigs[key.asString()] = value.asString(); } const auto& connectorPropertiesObj = - metaObj[QueryTraceTraits::kConnectorPropertiesKey]; + metaObj[TraceTraits::kConnectorPropertiesKey]; for (const auto& [connectorId, configs] : connectorPropertiesObj.items()) { const auto connectorIdStr = connectorId.asString(); connectorProperties[connectorIdStr] = {}; @@ -61,6 +57,6 @@ void QueryMetadataReader::read( } queryPlan = ISerializable::deserialize( - metaObj[QueryTraceTraits::kPlanNodeKey], pool_); + metaObj[TraceTraits::kPlanNodeKey], pool_); } } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryMetadataReader.h b/velox/exec/TaskTraceReader.h similarity index 84% rename from velox/exec/QueryMetadataReader.h rename to velox/exec/TaskTraceReader.h index 71217653d4dc3..df3abcffc3b69 100644 --- a/velox/exec/QueryMetadataReader.h +++ b/velox/exec/TaskTraceReader.h @@ -18,13 +18,11 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" -#include "velox/core/QueryCtx.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::exec::trace { -class QueryMetadataReader { +class TaskTraceMetadataReader { public: - explicit QueryMetadataReader(std::string traceDir, memory::MemoryPool* pool); + TaskTraceMetadataReader(std::string traceDir, memory::MemoryPool* pool); void read( std::unordered_map& queryConfigs, @@ -36,7 +34,7 @@ class QueryMetadataReader { private: const std::string traceDir_; const std::shared_ptr fs_; - const std::string metaFilePath_; + const std::string traceFilePath_; memory::MemoryPool* const pool_; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryMetadataWriter.cpp b/velox/exec/TaskTraceWriter.cpp similarity index 75% rename from velox/exec/QueryMetadataWriter.cpp rename to velox/exec/TaskTraceWriter.cpp index a8bbe6633c668..cbf0413fd0b4e 100644 --- a/velox/exec/QueryMetadataWriter.cpp +++ b/velox/exec/TaskTraceWriter.cpp @@ -14,30 +14,27 @@ * limitations under the License. */ -#include "velox/exec/QueryMetadataWriter.h" -#include "velox/common/config/Config.h" +#include "velox/exec/TaskTraceWriter.h" #include "velox/common/file/File.h" #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" -#include "velox/exec/QueryTraceTraits.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryMetadataWriter::QueryMetadataWriter( +TaskTraceMetadataWriter::TaskTraceMetadataWriter( std::string traceDir, memory::MemoryPool* pool) : traceDir_(std::move(traceDir)), fs_(filesystems::getFileSystem(traceDir_, nullptr)), - metaFilePath_(fmt::format( - "{}/{}", - traceDir_, - QueryTraceTraits::kQueryMetaFileName)), + traceFilePath_(getTaskTraceMetaFilePath(traceDir_)), pool_(pool) { VELOX_CHECK_NOT_NULL(fs_); - VELOX_CHECK(!fs_->exists(metaFilePath_)); + VELOX_CHECK(!fs_->exists(traceFilePath_)); } -void QueryMetadataWriter::write( +void TaskTraceMetadataWriter::write( const std::shared_ptr& queryCtx, const core::PlanNodePtr& planNode) { VELOX_CHECK(!finished_, "Query metadata can only be written once"); @@ -59,12 +56,12 @@ void QueryMetadataWriter::write( } folly::dynamic metaObj = folly::dynamic::object; - metaObj[QueryTraceTraits::kQueryConfigKey] = queryConfigObj; - metaObj[QueryTraceTraits::kConnectorPropertiesKey] = connectorPropertiesObj; - metaObj[QueryTraceTraits::kPlanNodeKey] = planNode->serialize(); + metaObj[TraceTraits::kQueryConfigKey] = queryConfigObj; + metaObj[TraceTraits::kConnectorPropertiesKey] = connectorPropertiesObj; + metaObj[TraceTraits::kPlanNodeKey] = planNode->serialize(); const auto metaStr = folly::toJson(metaObj); - const auto file = fs_->openFileForWrite(metaFilePath_); + const auto file = fs_->openFileForWrite(traceFilePath_); file->append(metaStr); file->close(); } diff --git a/velox/exec/QueryMetadataWriter.h b/velox/exec/TaskTraceWriter.h similarity index 85% rename from velox/exec/QueryMetadataWriter.h rename to velox/exec/TaskTraceWriter.h index f2cc661238514..5b3105104dd6c 100644 --- a/velox/exec/QueryMetadataWriter.h +++ b/velox/exec/TaskTraceWriter.h @@ -19,12 +19,11 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::exec::trace { -class QueryMetadataWriter { +class TaskTraceMetadataWriter { public: - explicit QueryMetadataWriter(std::string traceDir, memory::MemoryPool* pool); + TaskTraceMetadataWriter(std::string traceDir, memory::MemoryPool* pool); void write( const std::shared_ptr& queryCtx, @@ -33,7 +32,7 @@ class QueryMetadataWriter { private: const std::string traceDir_; const std::shared_ptr fs_; - const std::string metaFilePath_; + const std::string traceFilePath_; memory::MemoryPool* const pool_; bool finished_{false}; }; diff --git a/velox/exec/Trace.cpp b/velox/exec/Trace.cpp new file mode 100644 index 0000000000000..4d4d86df0e969 --- /dev/null +++ b/velox/exec/Trace.cpp @@ -0,0 +1,33 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/Trace.h" + +#include + +#include "velox/common/base/SuccinctPrinter.h" + +namespace facebook::velox::exec::trace { + +std::string OperatorTraceSummary::toString() const { + return fmt::format( + "opType {}, exceededTraceLimit {}, inputRows {}, peakMemory {}", + opType, + exceededTraceLimit, + inputRows, + succinctBytes(peakMemory)); +} +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceTraits.h b/velox/exec/Trace.h similarity index 62% rename from velox/exec/QueryTraceTraits.h rename to velox/exec/Trace.h index ad817115b4902..4e612122be0d4 100644 --- a/velox/exec/QueryTraceTraits.h +++ b/velox/exec/Trace.h @@ -20,16 +20,34 @@ namespace facebook::velox::exec::trace { /// Defines the shared constants used by query trace implementation. -struct QueryTraceTraits { +struct TraceTraits { static inline const std::string kPlanNodeKey = "planNode"; static inline const std::string kQueryConfigKey = "queryConfig"; static inline const std::string kDataTypeKey = "rowType"; static inline const std::string kConnectorPropertiesKey = "connectorProperties"; + + static inline const std::string kTaskMetaFileName = "task_trace_meta.json"; +}; + +struct OperatorTraceTraits { + static inline const std::string kSummaryFileName = "op_trace_summary.json"; + static inline const std::string kInputFileName = "op_input_trace.data"; + + /// Keys for operator trace summary file. + static inline const std::string kOpTypeKey = "opType"; static inline const std::string kTraceLimitExceededKey = "traceLimitExceeded"; + static inline const std::string kPeakMemoryKey = "peakMemory"; + static inline const std::string kInputRowsKey = "inputhRows"; +}; + +/// Contains the summary of an operator trace. +struct OperatorTraceSummary { + std::string opType; + bool exceededTraceLimit{false}; + uint64_t inputRows{0}; + uint64_t peakMemory{0}; - static inline const std::string kQueryMetaFileName = "query_meta.json"; - static inline const std::string kDataSummaryFileName = "data_summary.json"; - static inline const std::string kDataFileName = "trace.data"; + std::string toString() const; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceConfig.cpp b/velox/exec/TraceConfig.cpp similarity index 88% rename from velox/exec/QueryTraceConfig.cpp rename to velox/exec/TraceConfig.cpp index d43479d646089..7e90f1edec5b9 100644 --- a/velox/exec/QueryTraceConfig.cpp +++ b/velox/exec/TraceConfig.cpp @@ -14,13 +14,13 @@ * limitations under the License. */ -#include "velox/exec/QueryTraceConfig.h" +#include "velox/exec/TraceConfig.h" #include namespace facebook::velox::exec::trace { -QueryTraceConfig::QueryTraceConfig( +TraceConfig::TraceConfig( std::unordered_set _queryNodeIds, std::string _queryTraceDir, UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB, @@ -30,8 +30,8 @@ QueryTraceConfig::QueryTraceConfig( updateAndCheckTraceLimitCB(std::move(_updateAndCheckTraceLimitCB)), taskRegExp(std::move(_taskRegExp)) {} -QueryTraceConfig::QueryTraceConfig(std::string _queryTraceDir) - : QueryTraceConfig( +TraceConfig::TraceConfig(std::string _queryTraceDir) + : TraceConfig( std::unordered_set{}, std::move(_queryTraceDir), [](uint64_t) { return false; }, diff --git a/velox/exec/QueryTraceConfig.h b/velox/exec/TraceConfig.h similarity index 92% rename from velox/exec/QueryTraceConfig.h rename to velox/exec/TraceConfig.h index 200e5cb52ce4a..2b086b39d4b38 100644 --- a/velox/exec/QueryTraceConfig.h +++ b/velox/exec/TraceConfig.h @@ -27,7 +27,7 @@ namespace facebook::velox::exec::trace { /// bytes exceed the set limit otherwise return false. using UpdateAndCheckTraceLimitCB = std::function; -struct QueryTraceConfig { +struct TraceConfig { /// Target query trace nodes. std::unordered_set queryNodes; /// Base dir of query trace. @@ -36,14 +36,14 @@ struct QueryTraceConfig { /// The trace task regexp. std::string taskRegExp; - QueryTraceConfig( + TraceConfig( std::unordered_set _queryNodeIds, std::string _queryTraceDir, UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB, std::string _taskRegExp); - QueryTraceConfig(std::string _queryTraceDir); + TraceConfig(std::string _queryTraceDir); - QueryTraceConfig() = default; + TraceConfig() = default; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/TraceUtil.cpp b/velox/exec/TraceUtil.cpp new file mode 100644 index 0000000000000..4df3b3f12e100 --- /dev/null +++ b/velox/exec/TraceUtil.cpp @@ -0,0 +1,182 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/TraceUtil.h" + +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/file/File.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/Trace.h" + +namespace facebook::velox::exec::trace { +namespace { +std::string findLastPathNode(const std::string& path) { + std::vector pathNodes; + folly::split("/", path, pathNodes); + while (!pathNodes.empty() && pathNodes.back().empty()) { + pathNodes.pop_back(); + } + VELOX_CHECK(!pathNodes.empty(), "No valid path nodes found from {}", path); + return pathNodes.back(); +} +} // namespace + +void createTraceDirectory(const std::string& traceDir) { + try { + const auto fs = filesystems::getFileSystem(traceDir, nullptr); + if (fs->exists(traceDir)) { + fs->rmdir(traceDir); + } + fs->mkdir(traceDir); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to create trace directory '{}' with error: {}", + traceDir, + e.what()); + } +} + +std::string getQueryTraceDirectory( + const std::string& traceDir, + const std::string& queryId) { + return fmt::format("{}/{}", traceDir, queryId); +} + +std::string getTaskTraceDirectory( + const std::string& traceDir, + const Task& task) { + return getTaskTraceDirectory( + traceDir, task.queryCtx()->queryId(), task.taskId()); +} + +std::string getTaskTraceDirectory( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId) { + return fmt::format( + "{}/{}", getQueryTraceDirectory(traceDir, queryId), taskId); +} + +std::string getTaskTraceMetaFilePath(const std::string& taskTraceDir) { + return fmt::format("{}/{}", taskTraceDir, TraceTraits::kTaskMetaFileName); +} + +std::string getNodeTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId) { + return fmt::format("{}/{}", taskTraceDir, nodeId); +} + +std::string getPipelineTraceDirectory( + const std::string& nodeTraceDir, + uint32_t pipelineId) { + return fmt::format("{}/{}", nodeTraceDir, pipelineId); +} + +std::string getOpTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId, + uint32_t pipelineId, + uint32_t driverId) { + return getOpTraceDirectory( + getNodeTraceDirectory(taskTraceDir, nodeId), pipelineId, driverId); +} + +std::string getOpTraceDirectory( + const std::string& nodeTraceDir, + int pipelineId, + int driverId) { + return fmt::format("{}/{}/{}", nodeTraceDir, pipelineId, driverId); +} + +std::string getOpTraceInputFilePath(const std::string& opTraceDir) { + return fmt::format("{}/{}", opTraceDir, OperatorTraceTraits::kInputFileName); +} + +std::string getOpTraceSummaryFilePath(const std::string& opTraceDir) { + return fmt::format( + "{}/{}", opTraceDir, OperatorTraceTraits::kSummaryFileName); +} + +std::vector getTaskIds( + const std::string& traceDir, + const std::string& queryId, + const std::shared_ptr& fs) { + const auto queryTraceDir = getQueryTraceDirectory(traceDir, queryId); + VELOX_USER_CHECK( + fs->exists(queryTraceDir), "{} dose not exist", queryTraceDir); + const auto taskDirs = fs->list(queryTraceDir); + std::vector taskIds; + for (const auto& taskDir : taskDirs) { + taskIds.emplace_back(findLastPathNode(taskDir)); + } + return taskIds; +} + +folly::dynamic getTaskMetadata( + const std::string& taskMetaFilePath, + const std::shared_ptr& fs) { + try { + const auto file = fs->openFileForRead(taskMetaFilePath); + VELOX_CHECK_NOT_NULL(file); + const auto taskMeta = file->pread(0, file->size()); + VELOX_USER_CHECK(!taskMeta.empty()); + return folly::parseJson(taskMeta); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to get the query metadata from '{}' with error: {}", + taskMetaFilePath, + e.what()); + } +} + +RowTypePtr getDataType( + const core::PlanNodePtr& tracedPlan, + const std::string& tracedNodeId, + size_t sourceIndex) { + const auto* traceNode = core::PlanNode::findFirstNode( + tracedPlan.get(), [&tracedNodeId](const core::PlanNode* node) { + return node->id() == tracedNodeId; + }); + VELOX_CHECK_NOT_NULL( + traceNode, + "traced node id {} not found in the traced plan", + tracedNodeId); + return traceNode->sources().at(sourceIndex)->outputType(); +} + +std::vector listDriverIds( + const std::string& nodeTraceDir, + uint32_t pipelineId, + const std::shared_ptr& fs) { + const auto pipelineDir = getPipelineTraceDirectory(nodeTraceDir, pipelineId); + const auto driverDirs = fs->list(pipelineDir); + std::vector driverIds; + for (const auto& driverDir : driverDirs) { + driverIds.emplace_back(folly::to(findLastPathNode(driverDir))); + } + return driverIds; +} + +size_t getNumDrivers( + const std::string& nodeTraceDir, + uint32_t pipelineId, + const std::shared_ptr& fs) { + return listDriverIds(nodeTraceDir, pipelineId, fs).size(); +} +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceUtil.h b/velox/exec/TraceUtil.h similarity index 50% rename from velox/exec/QueryTraceUtil.h rename to velox/exec/TraceUtil.h index 633c0bc27b2a8..7b226844b7595 100644 --- a/velox/exec/QueryTraceUtil.h +++ b/velox/exec/TraceUtil.h @@ -20,6 +20,7 @@ #include #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" +#include "velox/exec/Task.h" #include "velox/type/Type.h" #include @@ -29,6 +30,52 @@ namespace facebook::velox::exec::trace { /// Creates a directory to store the query trace metdata and data. void createTraceDirectory(const std::string& traceDir); +/// Returns the trace directory for a given query. +std::string getQueryTraceDirectory( + const std::string& traceDir, + const std::string& queryId); + +/// Returns the trace directory for a given query task. +std::string getTaskTraceDirectory( + const std::string& traceDir, + const Task& task); + +std::string getTaskTraceDirectory( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId); + +/// Returns the file path for a given task's metadata trace file. +std::string getTaskTraceMetaFilePath(const std::string& taskTraceDir); + +/// Returns the trace directory for a given traced plan node. +std::string getNodeTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId); + +/// Returns the trace directory for a given traced pipeline. +std::string getPipelineTraceDirectory( + const std::string& nodeTraceDir, + uint32_t pipelineId); + +/// Returns the trace directory for a given traced operator. +std::string getOpTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId, + uint32_t pipelineId, + uint32_t driverId); + +std::string getOpTraceDirectory( + const std::string& nodeTraceDir, + int pipelineId, + int driverId); + +/// Returns the file path for a given operator's traced input file. +std::string getOpTraceInputFilePath(const std::string& opTraceDir); + +/// Returns the file path for a given operator's traced input file. +std::string getOpTraceSummaryFilePath(const std::string& opTraceDir); + /// Extracts the input data type for the trace scan operator. The function first /// uses the traced node id to find traced operator's plan node from the traced /// plan fragment. Then it uses the specified source node index to find the @@ -48,28 +95,32 @@ RowTypePtr getDataType( const std::string& tracedNodeId, size_t sourceIndex = 0); +/// Extracts the driver ids by listing the sub-directors under the trace +/// directory for a given pipeline and decode the sub-directory names to get +/// driver id. 'nodeTraceDir' is the trace directory of the plan node. +std::vector listDriverIds( + const std::string& nodeTraceDir, + uint32_t pipelineId, + const std::shared_ptr& fs); + /// Extracts the number of drivers by listing the number of sub-directors under -/// the trace directory for a given pipeline. -uint8_t getNumDrivers( - const std::string& rootDir, - const std::string& taskId, - const std::string& nodeId, - int32_t pipelineId, +/// the trace directory for a given pipeline. 'nodeTraceDir' is the trace +/// directory of the plan node. +size_t getNumDrivers( + const std::string& nodeTraceDir, + uint32_t pipelineId, const std::shared_ptr& fs); -/// Extracts task ids of the query tracing by listing the trace directory. +/// Extracts task ids of the query tracing by listing the query trace directory. +/// 'traceDir' is the root trace directory. 'queryId' is the query id. std::vector getTaskIds( const std::string& traceDir, + const std::string& queryId, const std::shared_ptr& fs); /// Gets the metadata from a given task metadata file which includes query plan, /// configs and connector properties. -folly::dynamic getMetadata( - const std::string& metadataFile, +folly::dynamic getTaskMetadata( + const std::string& taskMetaFilePath, const std::shared_ptr& fs); - -/// Gets the traced data directory. 'traceaDir' is the trace directory for a -/// given plan node, which is $traceRoot/$taskId/$nodeId. -std::string -getDataDir(const std::string& traceDir, int pipelineId, int driverId); } // namespace facebook::velox::exec::trace diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 8904fdc206482..9b228e302463f 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -60,14 +60,15 @@ add_executable( MultiFragmentTest.cpp NestedLoopJoinTest.cpp OrderByTest.cpp + OperatorTraceTest.cpp OutputBufferManagerTest.cpp PartitionedOutputTest.cpp PlanNodeSerdeTest.cpp PlanNodeToStringTest.cpp PrefixSortTest.cpp PrintPlanWithStatsTest.cpp - QueryTraceTest.cpp ProbeOperatorStateTest.cpp + TraceUtilTest.cpp RoundRobinPartitionFunctionTest.cpp RowContainerTest.cpp RowNumberTest.cpp diff --git a/velox/exec/tests/QueryTraceTest.cpp b/velox/exec/tests/OperatorTraceTest.cpp similarity index 68% rename from velox/exec/tests/QueryTraceTest.cpp rename to velox/exec/tests/OperatorTraceTest.cpp index 71b49da483a03..667b16435dbe3 100644 --- a/velox/exec/tests/QueryTraceTest.cpp +++ b/velox/exec/tests/OperatorTraceTest.cpp @@ -16,26 +16,25 @@ #include #include -#include #include +#include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryDataReader.h" -#include "velox/exec/QueryDataWriter.h" -#include "velox/exec/QueryMetadataReader.h" -#include "velox/exec/QueryMetadataWriter.h" -#include "velox/exec/QueryTraceUtil.h" -#include "velox/exec/tests/utils/ArbitratorTestUtil.h" +#include "velox/exec/TaskTraceReader.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/serializers/PrestoSerializer.h" -#include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox::exec::test; namespace facebook::velox::exec::trace::test { -class QueryTracerTest : public HiveConnectorTestBase { +class OperatorTraceTest : public HiveConnectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); @@ -55,6 +54,11 @@ class QueryTracerTest : public HiveConnectorTestBase { registerPartitionFunctionSerDe(); } + void SetUp() override { + HiveConnectorTestBase::SetUp(); + dataType_ = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); + } + static VectorFuzzer::Options getFuzzerOptions() { return VectorFuzzer::Options{ .vectorSize = 16, @@ -65,7 +69,7 @@ class QueryTracerTest : public HiveConnectorTestBase { }; } - QueryTracerTest() : vectorFuzzer_{getFuzzerOptions(), pool_.get()} { + OperatorTraceTest() : vectorFuzzer_{getFuzzerOptions(), pool_.get()} { filesystems::registerLocalFileSystem(); } @@ -99,32 +103,64 @@ class QueryTracerTest : public HiveConnectorTestBase { return true; } + std::unique_ptr driverCtx() { + return std::make_unique(nullptr, 0, 0, 0, 0); + } + + RowTypePtr dataType_; VectorFuzzer vectorFuzzer_; }; -TEST_F(QueryTracerTest, emptyTrace) { - const auto outputDir = TempDirectoryPath::create(); - auto writer = trace::QueryDataWriter( - outputDir->getPath(), pool(), [&](uint64_t bytes) { return false; }); - writer.finish(); - - const auto fs = filesystems::getFileSystem(outputDir->getPath(), nullptr); - const auto summaryFile = fs->openFileForRead(fmt::format( - "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataSummaryFileName)); - const auto summary = summaryFile->pread(0, summaryFile->size()); - ASSERT_FALSE(summary.empty()); - folly::dynamic obj = folly::parseJson(summary); - ASSERT_EQ(obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), false); +TEST_F(OperatorTraceTest, emptyTrace) { + auto input = vectorFuzzer_.fuzzInputRow(dataType_); + input->childAt(0) = + makeFlatVector(input->size(), [](auto /*unused*/) { return 0; }); + createDuckDbTable({input}); + + std::string planNodeId; + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder() + .values({input}) + .filter("a > 0") + .singleAggregation({"a"}, {"count(1)"}) + .capturePlanNodeId(planNodeId) + .planNode(); + + const auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, planNodeId) + .assertResults("SELECT a, count(1) FROM tmp WHERE a > 0 GROUP BY 1"); + + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + planNodeId, + /*pipelineId=*/0, + /*driverId=*/0); + const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read(); + ASSERT_EQ(summary.exceededTraceLimit, false); + ASSERT_EQ(summary.inputRows, 0); + ASSERT_EQ(summary.opType, "HashAggregation"); + // The hash aggregation operator might allocate memory when prepare output + // buffer even though there is no output. We could optimize out this later if + // needs. + ASSERT_GT(summary.peakMemory, 0); } -TEST_F(QueryTracerTest, traceData) { - const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); +TEST_F(OperatorTraceTest, traceData) { std::vector inputVectors; constexpr auto numBatch = 5; inputVectors.reserve(numBatch); for (auto i = 0; i < numBatch; ++i) { - inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType)); + inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(dataType_)); } + createDuckDbTable(inputVectors); struct { uint64_t maxTracedBytes; @@ -143,37 +179,58 @@ TEST_F(QueryTracerTest, traceData) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); - const auto outputDir = TempDirectoryPath::create(); - // Ensure the writer only write one batch. - uint64_t numTracedBytes{0}; - auto writer = trace::QueryDataWriter( - outputDir->getPath(), pool(), [&](uint64_t bytes) { - numTracedBytes += bytes; - return numTracedBytes >= testData.maxTracedBytes; - }); - for (auto i = 0; i < numBatch; ++i) { - writer.write(inputVectors[i]); - } - writer.finish(); - - const auto fs = filesystems::getFileSystem(outputDir->getPath(), nullptr); - const auto summaryFile = fs->openFileForRead(fmt::format( - "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataSummaryFileName)); - const auto summary = summaryFile->pread(0, summaryFile->size()); - ASSERT_FALSE(summary.empty()); - folly::dynamic obj = folly::parseJson(summary); - ASSERT_EQ( - obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), - testData.limitExceeded); + std::string planNodeId; + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder() + .values(inputVectors) + .singleAggregation({"a"}, {"count(1)"}) + .capturePlanNodeId(planNodeId) + .planNode(); + + const auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config( + core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, planNodeId) + .assertResults("SELECT a, count(1) FROM tmp GROUP BY 1"); + + const auto fs = + filesystems::getFileSystem(traceDirPath->getPath(), nullptr); + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + planNodeId, + /*pipelineId=*/0, + /*driverId=*/0); + const auto summaryFilePath = getOpTraceSummaryFilePath(opTraceDir); + const auto dataFilePath = getOpTraceInputFilePath(opTraceDir); if (testData.maxTracedBytes == 0) { - const auto dataFile = fs->openFileForRead(fmt::format( - "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataFileName)); - ASSERT_EQ(dataFile->size(), 0); + ASSERT_FALSE(fs->exists(summaryFilePath)); + ASSERT_FALSE(fs->exists(dataFilePath)); continue; + } else { + ASSERT_TRUE(fs->exists(summaryFilePath)); + ASSERT_TRUE(fs->exists(dataFilePath)); + } + + const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read(); + ASSERT_EQ(summary.exceededTraceLimit, testData.limitExceeded); + ASSERT_EQ(summary.opType, "HashAggregation"); + + ASSERT_GT(summary.peakMemory, 0); + if (testData.limitExceeded) { + ASSERT_GT(summary.inputRows, 0); + } else { + ASSERT_EQ(summary.inputRows, testData.numTracedBatches * 16); } - const auto reader = QueryDataReader(outputDir->getPath(), rowType, pool()); + const auto reader = OperatorTraceInputReader(opTraceDir, dataType_, pool()); RowVectorPtr actual; size_t numOutputVectors{0}; while (reader.read(actual)) { @@ -189,7 +246,7 @@ TEST_F(QueryTracerTest, traceData) { } } -TEST_F(QueryTracerTest, traceMetadata) { +TEST_F(OperatorTraceTest, traceMetadata) { const auto rowType = ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, {BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()}); @@ -234,14 +291,14 @@ TEST_F(QueryTracerTest, traceMetadata) { executor_.get(), core::QueryConfig(expectedQueryConfigs), expectedConnectorProperties); - auto writer = trace::QueryMetadataWriter(outputDir->getPath(), pool()); + auto writer = trace::TaskTraceMetadataWriter(outputDir->getPath(), pool()); writer.write(queryCtx, planNode); std::unordered_map acutalQueryConfigs; std::unordered_map> actualConnectorProperties; core::PlanNodePtr actualQueryPlan; - auto reader = trace::QueryMetadataReader(outputDir->getPath(), pool()); + auto reader = trace::TaskTraceMetadataReader(outputDir->getPath(), pool()); reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan); ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode)); @@ -262,7 +319,7 @@ TEST_F(QueryTracerTest, traceMetadata) { } } -TEST_F(QueryTracerTest, task) { +TEST_F(OperatorTraceTest, task) { const auto rowType = ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, {BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()}); @@ -345,7 +402,8 @@ TEST_F(QueryTracerTest, task) { } ASSERT_EQ(actaulDirs.size(), testData.expectedNumDirs); ASSERT_EQ(actaulDirs.at(0), expectedDir); - const auto taskIds = getTaskIds(outputDir->getPath(), fs); + const auto taskIds = + getTaskIds(outputDir->getPath(), task->queryCtx()->queryId(), fs); ASSERT_EQ(taskIds.size(), testData.expectedNumDirs); ASSERT_EQ(taskIds.at(0), task->taskId()); @@ -354,7 +412,7 @@ TEST_F(QueryTracerTest, task) { unordered_map> actualConnectorProperties; core::PlanNodePtr actualQueryPlan; - auto reader = trace::QueryMetadataReader(expectedDir, pool()); + auto reader = trace::TaskTraceMetadataReader(expectedDir, pool()); reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan); ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode)); @@ -377,7 +435,35 @@ TEST_F(QueryTracerTest, task) { } } -TEST_F(QueryTracerTest, error) { +TEST_F(OperatorTraceTest, nonExistTraceNodeId) { + auto input = vectorFuzzer_.fuzzInputRow(dataType_); + createDuckDbTable({input}); + + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder() + .values({input}) + .filter("a > 0") + .singleAggregation({"a"}, {"count(1)"}) + .planNode(); + + const std::string nonExistPlanNodeId("nonExistPlanNodeId"); + const auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, nonExistPlanNodeId) + .assertResults("SELECT a, count(1) FROM tmp WHERE a > 0 GROUP BY 1"); + + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + auto fs = filesystems::getFileSystem(taskTraceDir, nullptr); + ASSERT_FALSE(fs->exists(taskTraceDir)); +} + +TEST_F(OperatorTraceTest, error) { const auto planNode = PlanBuilder().values({}).planNode(); const auto expectedQueryConfigs = std::unordered_map{ @@ -393,54 +479,7 @@ TEST_F(QueryTracerTest, error) { "Query trace enabled but the trace dir is not set"); } -TEST_F(QueryTracerTest, traceDir) { - const auto outputDir = TempDirectoryPath::create(); - const auto rootDir = outputDir->getPath(); - const auto fs = filesystems::getFileSystem(rootDir, nullptr); - auto dir1 = fmt::format("{}/{}", outputDir->getPath(), "t1"); - trace::createTraceDirectory(dir1); - ASSERT_TRUE(fs->exists(dir1)); - - auto dir2 = fmt::format("{}/{}", dir1, "t1_1"); - trace::createTraceDirectory(dir2); - ASSERT_TRUE(fs->exists(dir2)); - - // It will remove the old dir1 along with its subdir when created the dir1 - // again. - trace::createTraceDirectory(dir1); - ASSERT_TRUE(fs->exists(dir1)); - ASSERT_FALSE(fs->exists(dir2)); - - const auto parentDir = fmt::format("{}/{}", outputDir->getPath(), "p"); - fs->mkdir(parentDir); - - constexpr auto numThreads = 5; - std::vector traceThreads; - traceThreads.reserve(numThreads); - std::mutex mutex; - std::set expectedDirs; - for (int i = 0; i < numThreads; ++i) { - traceThreads.emplace_back([&, i]() { - const auto dir = fmt::format("{}/s{}", parentDir, i); - trace::createTraceDirectory(dir); - std::lock_guard l(mutex); - expectedDirs.insert(dir); - }); - } - - for (auto& traceThread : traceThreads) { - traceThread.join(); - } - - const auto actualDirs = fs->list(parentDir); - ASSERT_EQ(actualDirs.size(), numThreads); - ASSERT_EQ(actualDirs.size(), expectedDirs.size()); - for (const auto& dir : actualDirs) { - ASSERT_EQ(expectedDirs.count(dir), 1); - } -} - -TEST_F(QueryTracerTest, traceTableWriter) { +TEST_F(OperatorTraceTest, traceTableWriter) { const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); std::vector inputVectors; constexpr auto numBatch = 5; @@ -490,8 +529,8 @@ TEST_F(QueryTracerTest, traceTableWriter) { .config(core::QueryConfig::kQueryTraceNodeIds, "1") .copyResults(pool(), task); - const auto metadataDir = fmt::format("{}/{}", traceRoot, task->taskId()); - const auto fs = filesystems::getFileSystem(metadataDir, nullptr); + const auto taskTraceDir = getTaskTraceDirectory(traceRoot, *task); + const auto fs = filesystems::getFileSystem(taskTraceDir, nullptr); if (testData.taskRegExpr == "wrong id") { ASSERT_FALSE(fs->exists(traceRoot)); @@ -499,34 +538,24 @@ TEST_F(QueryTracerTest, traceTableWriter) { } // Query metadta file should exist. - const auto traceMetaFile = fmt::format( - "{}/{}/{}", - traceRoot, - task->taskId(), - trace::QueryTraceTraits::kQueryMetaFileName); - ASSERT_TRUE(fs->exists(traceMetaFile)); + const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir); + ASSERT_TRUE(fs->exists(traceMetaFilePath)); - const auto dataDir = - fmt::format("{}/{}/{}", traceRoot, task->taskId(), "1/0/0/data"); + const auto opTraceDir = getOpTraceDirectory(taskTraceDir, "1", 0, 0); // Query data tracing disabled. if (testData.maxTracedBytes == 0) { - ASSERT_FALSE(fs->exists(dataDir)); + ASSERT_FALSE(fs->exists(opTraceDir)); continue; } - ASSERT_EQ(fs->list(dataDir).size(), 2); - // Check data summaries. - const auto summaryFile = fs->openFileForRead( - fmt::format("{}/{}", dataDir, QueryTraceTraits::kDataSummaryFileName)); - const auto summary = summaryFile->pread(0, summaryFile->size()); - ASSERT_FALSE(summary.empty()); - folly::dynamic obj = folly::parseJson(summary); - ASSERT_EQ( - obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), - testData.limitExceeded); + ASSERT_EQ(fs->list(opTraceDir).size(), 2); + + const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read(); + ASSERT_EQ(summary.exceededTraceLimit, testData.limitExceeded); - const auto reader = trace::QueryDataReader(dataDir, rowType, pool()); + const auto reader = + trace::OperatorTraceInputReader(opTraceDir, rowType, pool()); RowVectorPtr actual; size_t numOutputVectors{0}; while (reader.read(actual)) { diff --git a/velox/exec/tests/TraceUtilTest.cpp b/velox/exec/tests/TraceUtilTest.cpp new file mode 100644 index 0000000000000..2ed4d509e7447 --- /dev/null +++ b/velox/exec/tests/TraceUtilTest.cpp @@ -0,0 +1,188 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" + +using namespace facebook::velox::exec::test; + +namespace facebook::velox::exec::trace::test { +class TraceUtilTest : public testing::Test { + protected: + static void SetUpTestCase() { + filesystems::registerLocalFileSystem(); + } +}; + +TEST_F(TraceUtilTest, traceDir) { + const auto outputDir = TempDirectoryPath::create(); + const auto rootDir = outputDir->getPath(); + const auto fs = filesystems::getFileSystem(rootDir, nullptr); + auto dir1 = fmt::format("{}/{}", outputDir->getPath(), "t1"); + trace::createTraceDirectory(dir1); + ASSERT_TRUE(fs->exists(dir1)); + + auto dir2 = fmt::format("{}/{}", dir1, "t1_1"); + trace::createTraceDirectory(dir2); + ASSERT_TRUE(fs->exists(dir2)); + + // It will remove the old dir1 along with its subdir when created the dir1 + // again. + trace::createTraceDirectory(dir1); + ASSERT_TRUE(fs->exists(dir1)); + ASSERT_FALSE(fs->exists(dir2)); + + const auto parentDir = fmt::format("{}/{}", outputDir->getPath(), "p"); + fs->mkdir(parentDir); + + constexpr auto numThreads = 5; + std::vector traceThreads; + traceThreads.reserve(numThreads); + std::mutex mutex; + std::set expectedDirs; + for (int i = 0; i < numThreads; ++i) { + traceThreads.emplace_back([&, i]() { + const auto dir = fmt::format("{}/s{}", parentDir, i); + trace::createTraceDirectory(dir); + std::lock_guard l(mutex); + expectedDirs.insert(dir); + }); + } + + for (auto& traceThread : traceThreads) { + traceThread.join(); + } + + const auto actualDirs = fs->list(parentDir); + ASSERT_EQ(actualDirs.size(), numThreads); + ASSERT_EQ(actualDirs.size(), expectedDirs.size()); + for (const auto& dir : actualDirs) { + ASSERT_EQ(expectedDirs.count(dir), 1); + } +} + +TEST_F(TraceUtilTest, OperatorTraceSummary) { + exec::trace::OperatorTraceSummary summary; + summary.opType = "summary"; + summary.exceededTraceLimit = true; + summary.inputRows = 100; + summary.peakMemory = 200; + ASSERT_EQ( + summary.toString(), + "opType summary, exceededTraceLimit true, inputRows 100, peakMemory 200B"); +} + +TEST_F(TraceUtilTest, traceDirectoryLayoutUtilities) { + const std::string traceRoot = "/traceRoot"; + const std::string queryId = "queryId"; + ASSERT_EQ( + getQueryTraceDirectory(traceRoot, queryId), + fmt::format("{}/{}", traceRoot, queryId)); + const std::string taskId = "taskId"; + const std::string taskTraceDir = + getTaskTraceDirectory(traceRoot, queryId, taskId); + ASSERT_EQ(taskTraceDir, fmt::format("{}/{}/{}", traceRoot, queryId, taskId)); + ASSERT_EQ( + getTaskTraceMetaFilePath( + getTaskTraceDirectory(traceRoot, queryId, taskId)), + "/traceRoot/queryId/taskId/task_trace_meta.json"); + const std::string nodeId = "1"; + const std::string nodeTraceDir = getNodeTraceDirectory(taskTraceDir, nodeId); + ASSERT_EQ(nodeTraceDir, "/traceRoot/queryId/taskId/1"); + const uint32_t pipelineId = 1; + ASSERT_EQ( + getPipelineTraceDirectory(nodeTraceDir, pipelineId), + "/traceRoot/queryId/taskId/1/1"); + const uint32_t driverId = 1; + const std::string opTraceDir = + getOpTraceDirectory(taskTraceDir, nodeId, pipelineId, driverId); + ASSERT_EQ(opTraceDir, "/traceRoot/queryId/taskId/1/1/1"); + ASSERT_EQ( + getOpTraceDirectory(nodeTraceDir, pipelineId, driverId), + "/traceRoot/queryId/taskId/1/1/1"); + ASSERT_EQ( + getOpTraceInputFilePath(opTraceDir), + "/traceRoot/queryId/taskId/1/1/1/op_input_trace.data"); + ASSERT_EQ( + getOpTraceSummaryFilePath(opTraceDir), + "/traceRoot/queryId/taskId/1/1/1/op_trace_summary.json"); +} + +TEST_F(TraceUtilTest, getTaskIds) { + const auto rootDir = TempDirectoryPath::create(); + const auto rootPath = rootDir->getPath(); + const auto fs = filesystems::getFileSystem(rootPath, nullptr); + const std::string queryId = "queryId"; + fs->mkdir(trace::getQueryTraceDirectory(rootPath, queryId)); + ASSERT_TRUE(getTaskIds(rootPath, queryId, fs).empty()); + const std::string taskId1 = "task1"; + fs->mkdir(trace::getTaskTraceDirectory(rootPath, queryId, taskId1)); + const std::string taskId2 = "task2"; + fs->mkdir(trace::getTaskTraceDirectory(rootPath, queryId, taskId2)); + auto taskIds = getTaskIds(rootPath, queryId, fs); + ASSERT_EQ(taskIds.size(), 2); + std::set taskIdSet({taskId1, taskId2}); + ASSERT_EQ(*taskIds.begin(), taskId1); + ASSERT_EQ(*taskIds.rbegin(), taskId2); +} + +TEST_F(TraceUtilTest, getDriverIds) { + const auto rootDir = TempDirectoryPath::create(); + const auto rootPath = rootDir->getPath(); + const auto fs = filesystems::getFileSystem(rootPath, nullptr); + const std::string queryId = "queryId"; + fs->mkdir(trace::getQueryTraceDirectory(rootPath, queryId)); + ASSERT_TRUE(getTaskIds(rootPath, queryId, fs).empty()); + const std::string taskId = "task"; + const std::string taskTraceDir = + trace::getTaskTraceDirectory(rootPath, queryId, taskId); + fs->mkdir(taskTraceDir); + const std::string nodeId = "node"; + const std::string nodeTraceDir = + trace::getNodeTraceDirectory(taskTraceDir, nodeId); + fs->mkdir(nodeTraceDir); + const uint32_t pipelineId = 1; + fs->mkdir(trace::getPipelineTraceDirectory(nodeTraceDir, pipelineId)); + ASSERT_EQ(getNumDrivers(nodeTraceDir, pipelineId, fs), 0); + ASSERT_TRUE(listDriverIds(nodeTraceDir, pipelineId, fs).empty()); + // create 3 drivers. + const uint32_t driverId1 = 1; + fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId1)); + const uint32_t driverId2 = 2; + fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId2)); + const uint32_t driverId3 = 3; + fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId3)); + ASSERT_EQ(getNumDrivers(nodeTraceDir, pipelineId, fs), 3); + auto driverIds = listDriverIds(nodeTraceDir, pipelineId, fs); + ASSERT_EQ(driverIds.size(), 3); + std::sort(driverIds.begin(), driverIds.end()); + ASSERT_EQ(driverIds[0], driverId1); + ASSERT_EQ(driverIds[1], driverId2); + ASSERT_EQ(driverIds[2], driverId3); + // Bad driver id. + const std::string BadDriverId = "badDriverId"; + fs->mkdir(fmt::format("{}/{}/{}", nodeTraceDir, pipelineId, BadDriverId)); + ASSERT_ANY_THROW(getNumDrivers(nodeTraceDir, pipelineId, fs)); + ASSERT_ANY_THROW(listDriverIds(nodeTraceDir, pipelineId, fs)); +} +} // namespace facebook::velox::exec::trace::test diff --git a/velox/exec/tests/utils/OperatorTestBase.h b/velox/exec/tests/utils/OperatorTestBase.h index 4e5def101cfaa..079bfc2ff6b0e 100644 --- a/velox/exec/tests/utils/OperatorTestBase.h +++ b/velox/exec/tests/utils/OperatorTestBase.h @@ -24,8 +24,6 @@ #include "velox/exec/HashProbe.h" #include "velox/exec/tests/utils/QueryAssertions.h" #include "velox/parse/ExpressionsParser.h" -#include "velox/type/Variant.h" -#include "velox/vector/FlatVector.h" #include "velox/vector/tests/utils/VectorMaker.h" #include "velox/vector/tests/utils/VectorTestBase.h" diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 57d12d57bd137..5deed1140f4a0 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -222,9 +222,10 @@ PlanBuilder& PlanBuilder::values( PlanBuilder& PlanBuilder::traceScan( const std::string& traceNodeDir, + uint32_t pipelineId, const RowTypePtr& outputType) { - planNode_ = std::make_shared( - nextPlanNodeId(), traceNodeDir, outputType); + planNode_ = std::make_shared( + nextPlanNodeId(), traceNodeDir, pipelineId, outputType); return *this; } diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 341c25b067671..5af2e9f91ff7e 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -322,9 +322,12 @@ class PlanBuilder { /// Adds a QueryReplayNode for query tracing. /// /// @param traceNodeDir The trace directory for a given plan node. + /// @param pipelineId The pipeline id for the traced operator instantiated + /// from the given plan node. /// @param outputType The type of the tracing data. PlanBuilder& traceScan( const std::string& traceNodeDir, + uint32_t pipelineId, const RowTypePtr& outputType); /// Add an ExchangeNode. diff --git a/velox/tool/trace/AggregationReplayer.cpp b/velox/tool/trace/AggregationReplayer.cpp index 54041bf8bbcbd..1c397461a8a5f 100644 --- a/velox/tool/trace/AggregationReplayer.cpp +++ b/velox/tool/trace/AggregationReplayer.cpp @@ -15,7 +15,7 @@ */ #include "velox/tool/trace/AggregationReplayer.h" -#include "velox/exec/QueryDataReader.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/tests/utils/PlanBuilder.h" using namespace facebook::velox; diff --git a/velox/tool/trace/AggregationReplayer.h b/velox/tool/trace/AggregationReplayer.h index 21ce2c409acf6..9a14688a694a1 100644 --- a/velox/tool/trace/AggregationReplayer.h +++ b/velox/tool/trace/AggregationReplayer.h @@ -24,13 +24,15 @@ namespace facebook::velox::tool::trace { class AggregationReplayer : public OperatorReplayerBase { public: AggregationReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, const std::string& operatorType) : OperatorReplayerBase( - rootDir, + traceDir, + queryId, taskId, nodeId, pipelineId, diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index 3218b4ee30b0b..8c68c9967db31 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -33,7 +33,7 @@ velox_link_libraries( glog::glog gflags::gflags) -add_executable(velox_query_replayer QueryReplayer.cpp) +add_executable(velox_query_replayer TraceReplayerMain.cpp) target_link_libraries( velox_query_replayer diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index b54ad323a4471..2083e676f6781 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -16,11 +16,9 @@ #include -#include "velox/common/serialization/Serializable.h" #include "velox/core/PlanNode.h" -#include "velox/exec/QueryMetadataReader.h" -#include "velox/exec/QueryTraceTraits.h" -#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/TaskTraceReader.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/tool/trace/OperatorReplayerBase.h" @@ -29,30 +27,31 @@ using namespace facebook::velox; namespace facebook::velox::tool::trace { OperatorReplayerBase::OperatorReplayerBase( - std::string rootDir, + std::string traceDir, + std::string queryId, std::string taskId, std::string nodeId, int32_t pipelineId, std::string operatorType) - : taskId_(std::move(taskId)), + : queryId_(std::string(queryId)), + taskId_(std::move(taskId)), nodeId_(std::move(nodeId)), pipelineId_(pipelineId), operatorType_(std::move(operatorType)), - rootDir_(std::move(rootDir)), - taskDir_(fmt::format("{}/{}", rootDir_, taskId_)), - nodeDir_(fmt::format("{}/{}", taskDir_, nodeId_)) { - VELOX_USER_CHECK(!rootDir_.empty()); + taskTraceDir_( + exec::trace::getTaskTraceDirectory(traceDir, queryId_, taskId_)), + nodeTraceDir_(exec::trace::getNodeTraceDirectory(taskTraceDir_, nodeId_)), + fs_(filesystems::getFileSystem(taskTraceDir_, nullptr)), + maxDrivers_(exec::trace::getNumDrivers(nodeTraceDir_, pipelineId_, fs_)) { + VELOX_USER_CHECK(!taskTraceDir_.empty()); VELOX_USER_CHECK(!taskId_.empty()); VELOX_USER_CHECK(!nodeId_.empty()); VELOX_USER_CHECK_GE(pipelineId_, 0); VELOX_USER_CHECK(!operatorType_.empty()); - const auto metadataReader = exec::trace::QueryMetadataReader( - taskDir_, memory::MemoryManager::getInstance()->tracePool()); - metadataReader.read(queryConfigs_, connectorConfigs_, planFragment_); + const auto taskMetaReader = exec::trace::TaskTraceMetadataReader( + taskTraceDir_, memory::MemoryManager::getInstance()->tracePool()); + taskMetaReader.read(queryConfigs_, connectorConfigs_, planFragment_); queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false"; - fs_ = filesystems::getFileSystem(rootDir_, nullptr); - maxDrivers_ = - exec::trace::getNumDrivers(rootDir_, taskId_, nodeId_, pipelineId_, fs_); } RowVectorPtr OperatorReplayerBase::run() { @@ -69,7 +68,10 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { planFragment_.get(), [this](const core::PlanNode* node) { return node->id() == nodeId_; }); return exec::test::PlanBuilder() - .traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_)) + .traceScan( + nodeTraceDir_, + pipelineId_, + exec::trace::getDataType(planFragment_, nodeId_)) .addNode(replayNodeFactory(replayNode)) .planNode(); } diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index 863a8f5f80f69..50e31fdaf02f5 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -27,7 +27,8 @@ namespace facebook::velox::tool::trace { class OperatorReplayerBase { public: OperatorReplayerBase( - std::string rootDir, + std::string traceDir, + std::string queryId, std::string taskId, std::string nodeId, int32_t pipelineId, @@ -50,21 +51,21 @@ class OperatorReplayerBase { core::PlanNodePtr createPlan() const; + const std::string queryId_; const std::string taskId_; const std::string nodeId_; const int32_t pipelineId_; const std::string operatorType_; - const std::string rootDir_; - const std::string taskDir_; - const std::string nodeDir_; + const std::string taskTraceDir_; + const std::string nodeTraceDir_; + const std::shared_ptr fs_; + const int32_t maxDrivers_; std::unordered_map queryConfigs_; std::unordered_map> connectorConfigs_; core::PlanNodePtr planFragment_; - std::shared_ptr fs_; - int32_t maxDrivers_{1}; private: std::function diff --git a/velox/tool/trace/PartitionedOutputReplayer.cpp b/velox/tool/trace/PartitionedOutputReplayer.cpp index 79874caefc93a..56b4c0a145eb7 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.cpp +++ b/velox/tool/trace/PartitionedOutputReplayer.cpp @@ -18,7 +18,7 @@ #include "velox/common/memory/Memory.h" #include "velox/exec/PartitionedOutput.h" -#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/tool/trace/PartitionedOutputReplayer.h" @@ -105,13 +105,20 @@ void consumeAllData( } PartitionedOutputReplayer::PartitionedOutputReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, const std::string& operatorType, const ConsumerCallBack& consumerCb) - : OperatorReplayerBase(rootDir, taskId, nodeId, pipelineId, operatorType), + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + pipelineId, + operatorType), originalNode_(dynamic_cast( core::PlanNode::findFirstNode( planFragment_.get(), diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index 61610aa031ccb..a90c222dbf657 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -41,7 +41,8 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { std::function)>; PartitionedOutputReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, diff --git a/velox/tool/trace/TableWriterReplayer.cpp b/velox/tool/trace/TableWriterReplayer.cpp index 3543011aed9fb..405071da433e3 100644 --- a/velox/tool/trace/TableWriterReplayer.cpp +++ b/velox/tool/trace/TableWriterReplayer.cpp @@ -16,8 +16,8 @@ #include -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/tool/trace/TableWriterReplayer.h" diff --git a/velox/tool/trace/TableWriterReplayer.h b/velox/tool/trace/TableWriterReplayer.h index c0c675ba63335..8684be91478d7 100644 --- a/velox/tool/trace/TableWriterReplayer.h +++ b/velox/tool/trace/TableWriterReplayer.h @@ -27,13 +27,20 @@ namespace facebook::velox::tool::trace { class TableWriterReplayer final : public OperatorReplayerBase { public: TableWriterReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, const std::string& operatorType, const std::string& replayOutputDir) - : OperatorReplayerBase(rootDir, taskId, nodeId, pipelineId, operatorType), + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + pipelineId, + operatorType), replayOutputDir_(replayOutputDir) { VELOX_CHECK(!replayOutputDir_.empty()); } diff --git a/velox/tool/trace/QueryReplayer.cpp b/velox/tool/trace/TraceReplayRunner.cpp similarity index 57% rename from velox/tool/trace/QueryReplayer.cpp rename to velox/tool/trace/TraceReplayRunner.cpp index a0680d30a387a..b2cd0d4a0be3b 100644 --- a/velox/tool/trace/QueryReplayer.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -14,7 +14,8 @@ * limitations under the License. */ -#include +#include "velox/tool/trace/TraceReplayRunner.h" + #include #include "velox/common/file/FileSystems.h" @@ -32,12 +33,12 @@ #include "velox/dwio/dwrf/RegisterDwrfWriter.h" #include "velox/dwio/parquet/RegisterParquetReader.h" #include "velox/dwio/parquet/RegisterParquetWriter.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryTraceUtil.h" -#include "velox/expression/Expr.h" +#include "velox/exec/TaskTraceReader.h" +#include "velox/exec/TraceUtil.h" #include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" -#include "velox/parse/ExpressionsParser.h" #include "velox/parse/TypeResolver.h" #include "velox/tool/trace/AggregationReplayer.h" #include "velox/tool/trace/OperatorReplayerBase.h" @@ -57,6 +58,7 @@ DEFINE_bool( "It also print the query metadata including query configs, connectors " "properties, and query plan in JSON format."); DEFINE_bool(short_summary, false, "Only show number of tasks and task ids"); +DEFINE_string(query_id, "", "Specify the target query id which must be set"); DEFINE_string( task_id, "", @@ -74,61 +76,18 @@ DEFINE_double( 2.0, "Hardware multipler for hive connector."); -using namespace facebook::velox; - +namespace facebook::velox::tool::trace { namespace { -void init() { - memory::initializeMemoryManager({}); - filesystems::registerLocalFileSystem(); - filesystems::registerS3FileSystem(); - filesystems::registerHdfsFileSystem(); - filesystems::registerGCSFileSystem(); - filesystems::abfs::registerAbfsFileSystem(); - - dwio::common::registerFileSinks(); - dwrf::registerDwrfReaderFactory(); - dwrf::registerDwrfWriterFactory(); - parquet::registerParquetReaderFactory(); - parquet::registerParquetWriterFactory(); - - core::PlanNode::registerSerDe(); - core::ITypedExpr::registerSerDe(); - common::Filter::registerSerDe(); - Type::registerSerDe(); - exec::registerPartitionFunctionSerDe(); - if (!isRegisteredVectorSerde()) { - serializer::presto::PrestoVectorSerde::registerVectorSerde(); - } - connector::hive::HiveTableHandle::registerSerDe(); - connector::hive::LocationHandle::registerSerDe(); - connector::hive::HiveColumnHandle::registerSerDe(); - connector::hive::HiveInsertTableHandle::registerSerDe(); - connector::hive::HiveConnectorSplit::registerSerDe(); - - functions::prestosql::registerAllScalarFunctions(); - aggregate::prestosql::registerAllAggregateFunctions(); - parse::registerTypeResolver(); - - // TODO: make it configurable. - const auto ioExecutor = std::make_unique( - std::thread::hardware_concurrency() * - FLAGS_hiveConnectorExecutorHwMultiplier); - connector::registerConnectorFactory( - std::make_shared()); - const auto hiveConnector = - connector::getConnectorFactory("hive")->newConnector( - "test-hive", - std::make_shared( - std::unordered_map()), - ioExecutor.get()); - connector::registerConnector(hiveConnector); -} std::unique_ptr createReplayer() { std::unique_ptr replayer; if (FLAGS_operator_type == "TableWriter") { + VELOX_USER_CHECK( + !FLAGS_table_writer_output_dir.empty(), + "--table_writer_output_dir is required"); replayer = std::make_unique( FLAGS_root_dir, + FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, @@ -137,6 +96,7 @@ std::unique_ptr createReplayer() { } else if (FLAGS_operator_type == "Aggregation") { replayer = std::make_unique( FLAGS_root_dir, + FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, @@ -144,6 +104,7 @@ std::unique_ptr createReplayer() { } else if (FLAGS_operator_type == "PartitionedOutput") { replayer = std::make_unique( FLAGS_root_dir, + FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, @@ -155,76 +116,175 @@ std::unique_ptr createReplayer() { return replayer; } +void printTaskMetadata( + const std::string& taskTraceDir, + memory::MemoryPool* pool, + std::ostringstream& oss) { + auto taskMetaReader = std::make_unique( + taskTraceDir, pool); + std::unordered_map queryConfigs; + std::unordered_map> + connectorProperties; + core::PlanNodePtr queryPlan; + taskMetaReader->read(queryConfigs, connectorProperties, queryPlan); + + oss << "\n++++++Query configs++++++\n"; + for (const auto& queryConfigEntry : queryConfigs) { + oss << "\t" << queryConfigEntry.first << ": " << queryConfigEntry.second + << "\n"; + } + oss << "\n++++++Connector configs++++++\n"; + for (const auto& connectorPropertyEntry : connectorProperties) { + oss << connectorPropertyEntry.first << "\n"; + for (const auto& propertyEntry : connectorPropertyEntry.second) { + oss << "\t" << propertyEntry.first << ": " << propertyEntry.second + << "\n"; + } + } + oss << "\n++++++Task query plan++++++\n"; + oss << queryPlan->toString(true, true); +} + +void printTaskTraceSummary( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId, + const std::string& nodeId, + uint32_t pipelineId, + memory::MemoryPool* pool, + std::ostringstream& oss) { + auto fs = filesystems::getFileSystem(traceDir, nullptr); + const auto taskTraceDir = + exec::trace::getTaskTraceDirectory(traceDir, queryId, taskId); + + const std::vector driverIds = exec::trace::listDriverIds( + exec::trace::getNodeTraceDirectory(taskTraceDir, nodeId), pipelineId, fs); + oss << "\n++++++Task " << taskId << "++++++\n"; + for (const auto& driverId : driverIds) { + const auto opTraceDir = exec::trace::getOpTraceDirectory( + taskTraceDir, nodeId, pipelineId, driverId); + const auto opTraceSummary = + exec::trace::OperatorTraceSummaryReader( + exec::trace::getOpTraceDirectory( + taskTraceDir, nodeId, pipelineId, driverId), + pool) + .read(); + oss << driverId << " driver, " << opTraceSummary.toString() << "\n"; + } +} + void printSummary( const std::string& rootDir, + const std::string& queryId, const std::string& taskId, - bool shortSummary) { - const auto fs = filesystems::getFileSystem(rootDir, nullptr); - const auto taskIds = exec::trace::getTaskIds(rootDir, fs); - if (taskIds.empty()) { - LOG(ERROR) << "No traced query task under " << rootDir; - return; - } + bool shortSummary, + memory::MemoryPool* pool) { + const std::string queryDir = + exec::trace::getQueryTraceDirectory(rootDir, queryId); + const auto fs = filesystems::getFileSystem(queryDir, nullptr); + const auto taskIds = exec::trace::getTaskIds(rootDir, queryId, fs); + VELOX_USER_CHECK(!taskIds.empty(), "No task found under {}", rootDir); std::ostringstream summary; summary << "\n++++++Query trace summary++++++\n"; summary << "Number of tasks: " << taskIds.size() << "\n"; - summary << "Task ids: " << folly::join(",", taskIds); - if (shortSummary) { + summary << "Task ids: " << folly::join("\n", taskIds); LOG(INFO) << summary.str(); return; } const auto summaryTaskIds = taskId.empty() ? taskIds : std::vector{taskId}; + printTaskMetadata( + exec::trace::getTaskTraceDirectory(rootDir, queryId, summaryTaskIds[0]), + pool, + summary); + summary << "\n++++++Task Summaries++++++\n"; for (const auto& taskId : summaryTaskIds) { - summary << "\n++++++Query configs and plan of task " << taskId - << ":++++++\n"; - const auto traceTaskDir = fmt::format("{}/{}", rootDir, taskId); - const auto queryMetaFile = fmt::format( - "{}/{}", - traceTaskDir, - exec::trace::QueryTraceTraits::kQueryMetaFileName); - const auto metaObj = exec::trace::getMetadata(queryMetaFile, fs); - const auto& configObj = - metaObj[exec::trace::QueryTraceTraits::kQueryConfigKey]; - summary << "++++++Query configs++++++\n"; - summary << folly::toJson(configObj) << "\n"; - summary << "++++++Query plan++++++\n"; - const auto queryPlan = ISerializable::deserialize( - metaObj[exec::trace::QueryTraceTraits::kPlanNodeKey], - memory::MemoryManager::getInstance()->tracePool()); - summary << queryPlan->toString(true, true); + printTaskTraceSummary( + rootDir, + queryId, + taskId, + FLAGS_node_id, + FLAGS_pipeline_id, + pool, + summary); } LOG(INFO) << summary.str(); } } // namespace -int main(int argc, char** argv) { - if (argc == 1) { - gflags::ShowUsageWithFlags(argv[0]); - return -1; - } +TraceReplayRunner::TraceReplayRunner() + : ioExecutor_(std::make_unique( + std::thread::hardware_concurrency() * + FLAGS_hiveConnectorExecutorHwMultiplier, + std::make_shared( + "TraceReplayIoConnector"))) {} + +void TraceReplayRunner::init() { + memory::initializeMemoryManager({}); + filesystems::registerLocalFileSystem(); + filesystems::registerS3FileSystem(); + filesystems::registerHdfsFileSystem(); + filesystems::registerGCSFileSystem(); + filesystems::abfs::registerAbfsFileSystem(); + + dwio::common::registerFileSinks(); + dwrf::registerDwrfReaderFactory(); + dwrf::registerDwrfWriterFactory(); + parquet::registerParquetReaderFactory(); + parquet::registerParquetWriterFactory(); - gflags::ParseCommandLineFlags(&argc, &argv, true); - if (FLAGS_root_dir.empty()) { - gflags::SetUsageMessage("--root_dir must be provided."); - gflags::ShowUsageWithFlags(argv[0]); - return -1; + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + common::Filter::registerSerDe(); + Type::registerSerDe(); + exec::registerPartitionFunctionSerDe(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); } + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); + connector::hive::registerHivePartitionFunctionSerDe(); + connector::hive::HiveBucketProperty::registerSerDe(); - try { - init(); - if (FLAGS_summary || FLAGS_short_summary) { - printSummary(FLAGS_root_dir, FLAGS_task_id, FLAGS_short_summary); - return 0; - } - createReplayer()->run(); - } catch (const VeloxException& e) { - LOG(ERROR) << e.what(); - return -1; + functions::prestosql::registerAllScalarFunctions(); + aggregate::prestosql::registerAllAggregateFunctions(); + parse::registerTypeResolver(); + + connector::registerConnectorFactory( + std::make_shared()); + const auto hiveConnector = + connector::getConnectorFactory("hive")->newConnector( + "test-hive", + std::make_shared( + std::unordered_map()), + ioExecutor_.get()); + connector::registerConnector(hiveConnector); +} + +void TraceReplayRunner::run() { + VELOX_USER_CHECK(!FLAGS_root_dir.empty(), "--root_dir must be provided"); + VELOX_USER_CHECK(!FLAGS_query_id.empty(), "--query_id must be provided"); + VELOX_USER_CHECK(!FLAGS_node_id.empty(), "--node_id must be provided"); + + if (FLAGS_summary || FLAGS_short_summary) { + auto pool = memory::memoryManager()->addLeafPool("replayer"); + printSummary( + FLAGS_root_dir, + FLAGS_query_id, + FLAGS_task_id, + FLAGS_short_summary, + pool.get()); + return; } - return 0; + VELOX_USER_CHECK( + !FLAGS_operator_type.empty(), "--operator_type must be provided"); + createReplayer()->run(); } +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h new file mode 100644 index 0000000000000..f1f1d7484f621 --- /dev/null +++ b/velox/tool/trace/TraceReplayRunner.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +DECLARE_string(root_dir); +DECLARE_bool(summary); +DECLARE_bool(short_summary); +DECLARE_string(query_id); +DECLARE_string(task_id); +DECLARE_string(node_id); +DECLARE_int32(pipeline_id); +DECLARE_string(operator_type); +DECLARE_string(table_writer_output_dir); +DECLARE_double(hiveConnectorExecutorHwMultiplier); + +namespace facebook::velox::tool::trace { + +/// The trace replay runner. It is configured through a set of gflags passed +/// from replayer tool command line. +class TraceReplayRunner { + public: + TraceReplayRunner(); + virtual ~TraceReplayRunner() = default; + + /// Initializes the trace replay runner by setting the velox runtime + /// environment for the trace replay. It is invoked before run(). + virtual void init(); + + /// Runs the trace replay with a set of gflags passed from replayer tool. + virtual void run(); + + private: + const std::unique_ptr ioExecutor_; +}; + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayerMain.cpp b/velox/tool/trace/TraceReplayerMain.cpp new file mode 100644 index 0000000000000..cfca543a04fa1 --- /dev/null +++ b/velox/tool/trace/TraceReplayerMain.cpp @@ -0,0 +1,28 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/TraceReplayRunner.h" + +#include + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + facebook::velox::tool::trace::TraceReplayRunner runner; + runner.init(); + runner.run(); + return 0; +} diff --git a/velox/tool/trace/tests/AggregationReplayerTest.cpp b/velox/tool/trace/tests/AggregationReplayerTest.cpp index 8d3b15abde620..55a71f739eece 100644 --- a/velox/tool/trace/tests/AggregationReplayerTest.cpp +++ b/velox/tool/trace/tests/AggregationReplayerTest.cpp @@ -26,10 +26,10 @@ #include "velox/common/hyperloglog/SparseHll.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryDataReader.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" @@ -198,10 +198,14 @@ TEST_F(AggregationReplayerTest, test) { .split(makeHiveConnectorSplit(sourceFilePath->getPath())) .copyResults(pool(), task); - const auto replayingResult = - AggregationReplayer( - traceRoot, task->taskId(), traceNodeId_, 0, "Aggregation") - .run(); + const auto replayingResult = AggregationReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + 0, + "Aggregation") + .run(); assertEqualResults({results}, {replayingResult}); } } diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp index b34ead3fb3666..d2d9c2c4e49f3 100644 --- a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -24,8 +24,8 @@ #include "velox/common/hyperloglog/SparseHll.h" #include "velox/exec/PartitionFunction.h" #include "velox/exec/PartitionedOutput.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -143,10 +143,15 @@ TEST_F(PartitionedOutputReplayerTest, defaultConsumer) { executor_.get(), consumerExecutor.get(), [&](auto /* unused */, auto /* unused */) {}); - ASSERT_NO_THROW( - PartitionedOutputReplayer( - traceRoot, originalTask->taskId(), planNodeId, 0, "PartitionedOutput") - .run()); + + ASSERT_NO_THROW(PartitionedOutputReplayer( + traceRoot, + originalTask->queryCtx()->queryId(), + originalTask->taskId(), + planNodeId, + 0, + "PartitionedOutput") + .run()); } TEST_F(PartitionedOutputReplayerTest, basic) { @@ -223,6 +228,7 @@ TEST_F(PartitionedOutputReplayerTest, basic) { replayedPartitionedResults.resize(testParam.numPartitions); PartitionedOutputReplayer( traceRoot, + originalTask->queryCtx()->queryId(), originalTask->taskId(), planNodeId, 0, diff --git a/velox/tool/trace/tests/TableWriterReplayerTest.cpp b/velox/tool/trace/tests/TableWriterReplayerTest.cpp index ef2c4c54d682d..5e4ec3eddfa96 100644 --- a/velox/tool/trace/tests/TableWriterReplayerTest.cpp +++ b/velox/tool/trace/tests/TableWriterReplayerTest.cpp @@ -24,10 +24,10 @@ #include "velox/common/base/Fs.h" #include "velox/common/file/FileSystems.h" #include "velox/common/hyperloglog/SparseHll.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryDataReader.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -297,6 +297,7 @@ TEST_F(TableWriterReplayerTest, basic) { const auto traceOutputDir = TempDirectoryPath::create(); const auto result = TableWriterReplayer( traceRoot, + task->queryCtx()->queryId(), task->taskId(), "1", 0, @@ -422,6 +423,7 @@ TEST_F(TableWriterReplayerTest, partitionWrite) { const auto traceOutputDir = TempDirectoryPath::create(); TableWriterReplayer( traceRoot, + task->queryCtx()->queryId(), task->taskId(), tableWriteNodeId, 0,