Skip to content

Commit

Permalink
Improve operator tracing and make it E2E work (#11360)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11360

This PR improves the operator trace framework and replay tool implementation:
(1) Let driver execution framework to handle the trace input collection and summary file generation.
The finish trace is either called when trace limit exceeds or on operator close instead of no
more input. So it can capture more information in summary like peak memory usage. By removing
the trace input collection into the driver framework it eases the trace input collection for a spilling
operator. We add to capture the peak memory and input rows in summary so it helps identify the
hot operator or task on replay debugging.
(2) Change the trace storage layout to have root with query id for traces from different tasks
(3) Abstract the replay tool function into TraceReplayRunner class and derive in Meta internal code repo
to handle the Meta internal env setup and keep most common part in  TraceReplayRunner for OSS
(4) A couple of fixes in trace replay tool to make it E2E function in Meta for table writer use case
(5) Simplify the trace control logic by throwing if hit trace limit instead of logging in trace summary.
(6) Strict the check if trace plan not is not specified or has specified the wrong node as we are
only supposed to use trace for debugging purpose instead of production query running.
(7) A couple of file/class renaming to make the file/class name to be more specific as currently we only
support operator level trace collection and replay

Reviewed By: tanjialiang

Differential Revision: D64946367
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 30, 2024
1 parent b1834bd commit 28cf060
Show file tree
Hide file tree
Showing 58 changed files with 1,400 additions and 719 deletions.
3 changes: 3 additions & 0 deletions velox/common/base/VeloxException.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ inline constexpr auto kNoCacheSpace = "NO_CACHE_SPACE"_fs;
/// An error raised when spill bytes exceeds limits.
inline constexpr auto kSpillLimitExceeded = "SPILL_LIMIT_EXCEEDED"_fs;

/// An error raised when trace bytes exceeds limits.
inline constexpr auto kTraceLimitExceeded = "TRACE_LIMIT_EXCEEDED"_fs;

/// Errors indicating file read corruptions.
inline constexpr auto kFileCorruption = "FILE_CORRUPTION"_fs;

Expand Down
5 changes: 3 additions & 2 deletions velox/common/file/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ void LocalReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
VELOX_CHECK_EQ(
bytesRead,
length,
"fread failure in LocalReadFile::PReadInternal, {} vs {}.",
"fread failure in LocalReadFile::PReadInternal, {} vs {}: {}",
bytesRead,
length);
length,
folly::errnoStr(errno));
}

std::string_view
Expand Down
6 changes: 3 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2355,15 +2355,15 @@ folly::dynamic PlanNode::serialize() const {
return obj;
}

const std::vector<PlanNodePtr>& QueryTraceScanNode::sources() const {
const std::vector<PlanNodePtr>& TraceScanNode::sources() const {
return kEmptySources;
}

std::string QueryTraceScanNode::traceDir() const {
std::string TraceScanNode::traceDir() const {
return traceDir_;
}

void QueryTraceScanNode::addDetails(std::stringstream& stream) const {
void TraceScanNode::addDetails(std::stringstream& stream) const {
stream << "Trace dir: " << traceDir_;
}

Expand Down
17 changes: 13 additions & 4 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,17 @@ class ArrowStreamNode : public PlanNode {
std::shared_ptr<ArrowArrayStream> arrowStream_;
};

class QueryTraceScanNode final : public PlanNode {
class TraceScanNode final : public PlanNode {
public:
QueryTraceScanNode(
TraceScanNode(
const PlanNodeId& id,
const std::string& traceDir,
uint32_t pipelineId,
const RowTypePtr& outputType)
: PlanNode(id), traceDir_(traceDir), outputType_(outputType) {}
: PlanNode(id),
traceDir_(traceDir),
pipelineId_(pipelineId),
outputType_(outputType) {}

const RowTypePtr& outputType() const override {
return outputType_;
Expand All @@ -333,17 +337,22 @@ class QueryTraceScanNode final : public PlanNode {
}

folly::dynamic serialize() const override {
VELOX_UNSUPPORTED("QueryReplayScanNode is not serializable");
VELOX_UNSUPPORTED("TraceScanNode is not serializable");
return nullptr;
}

std::string traceDir() const;

uint32_t pipelineId() const {
return pipelineId_;
}

private:
void addDetails(std::stringstream& stream) const override;

// Directory of traced data, which is $traceRoot/$taskId/$nodeId.
const std::string traceDir_;
const uint32_t pipelineId_;
const RowTypePtr outputType_;
};

Expand Down
13 changes: 5 additions & 8 deletions velox/core/QueryCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,13 @@ void QueryCtx::updateSpilledBytesAndCheckLimit(uint64_t bytes) {
}
}

bool QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) {
if (numTracedBytes_.fetch_add(bytes) + bytes <
void QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) {
if (numTracedBytes_.fetch_add(bytes) + bytes >=
queryConfig_.queryTraceMaxBytes()) {
return false;
VELOX_SPILL_LIMIT_EXCEEDED(fmt::format(
"Query exceeded per-query local trace limit of {}",
succinctBytes(queryConfig_.queryTraceMaxBytes())));
}

numTracedBytes_.fetch_sub(bytes);
LOG(WARNING) << "Query exceeded trace limit of "
<< succinctBytes(queryConfig_.queryTraceMaxBytes());
return true;
}

std::unique_ptr<memory::MemoryReclaimer> QueryCtx::MemoryReclaimer::create(
Expand Down
6 changes: 3 additions & 3 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
/// the max spill bytes limit.
void updateSpilledBytesAndCheckLimit(uint64_t bytes);

/// Updates the aggregated trace bytes of this query, and return true if
/// exceeds the max query trace bytes limit.
bool updateTracedBytesAndCheckLimit(uint64_t bytes);
/// Updates the aggregated trace bytes of this query, and throws if exceeds
/// the max query trace bytes limit.
void updateTracedBytesAndCheckLimit(uint64_t bytes);

void testingOverrideMemoryPool(std::shared_ptr<memory::MemoryPool> pool) {
pool_ = std::move(pool);
Expand Down
8 changes: 4 additions & 4 deletions velox/docs/develop/debugging/tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ The tracing framework consists of three components:
Query Trace Writer
^^^^^^^^^^^^^^^^^^

**QueryMetadataWriter** records the query metadata during task creation,
**TaskTraceMetadataWriter** records the query metadata during task creation,
serializes, and writes them into a file in JSON format. There are two kinds
of metadata:

Expand All @@ -66,20 +66,20 @@ of metadata:
- Plan fragment of the task (also known as a plan node tree). It can be serialized
as a JSON object, which is already supported in Velox.

**QueryDataWriter** records the input vectors from the target operator, which are
**OperatorTraceWriter** records the input vectors from the target operator, which are
serialized and written as a binary file.

Query Trace Reader
^^^^^^^^^^^^^^^^^^

**QueryMetadataReader** can load the query metadata JSON file, and extract the query
**TaskTraceMetadataReader** can load the query metadata JSON file, and extract the query
configurations, connector properties, and the plan fragment.

**QueryDataReader** can read and deserialize the input vectors of the target operator.
It is used as the utility to replay the input data as a source operator in the target
operator replay.

**NOTE**: `QueryDataWriter` serializes and flushes the input vectors in batches,
**NOTE**: `OperatorTraceWriter` serializes and flushes the input vectors in batches,
allowing it to replay the input process using the same sequence of batches.

Query Trace Util
Expand Down
15 changes: 8 additions & 7 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ velox_add_library(
OrderBy.cpp
OutputBuffer.cpp
OutputBufferManager.cpp
QueryDataReader.cpp
QueryDataWriter.cpp
QueryMetadataReader.cpp
QueryMetadataWriter.cpp
QueryTraceConfig.cpp
QueryTraceScan.cpp
QueryTraceUtil.cpp
OperatorTraceReader.cpp
OperatorTraceScan.cpp
OperatorTraceWriter.cpp
TaskTraceReader.cpp
TaskTraceWriter.cpp
Trace.cpp
TraceConfig.cpp
TraceUtil.cpp
PartitionedOutput.cpp
PartitionFunction.cpp
PartitionStreamingWindowBuild.cpp
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ const core::QueryConfig& DriverCtx::queryConfig() const {
return task->queryCtx()->queryConfig();
}

const std::optional<trace::QueryTraceConfig>& DriverCtx::traceConfig() const {
return task->queryTraceConfig();
const std::optional<trace::TraceConfig>& DriverCtx::traceConfig() const {
return task->traceConfig();
}

velox::memory::MemoryPool* DriverCtx::addOperatorPool(
Expand Down Expand Up @@ -618,6 +618,7 @@ StopReason Driver::runInternal(
lockedStats->addInputVector(
resultBytes, intermediateResult->size());
}
nextOp->traceInput(intermediateResult);
TestValue::adjust(
"facebook::velox::exec::Driver::runInternal::addInput",
nextOp);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "velox/common/time/CpuWallTimer.h"
#include "velox/core/PlanFragment.h"
#include "velox/core/QueryCtx.h"
#include "velox/exec/QueryTraceConfig.h"
#include "velox/exec/TraceConfig.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -291,7 +291,7 @@ struct DriverCtx {

const core::QueryConfig& queryConfig() const;

const std::optional<trace::QueryTraceConfig>& traceConfig() const;
const std::optional<trace::TraceConfig>& traceConfig() const;

velox::memory::MemoryPool* addOperatorPool(
const core::PlanNodeId& planNodeId,
Expand Down
1 change: 0 additions & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
}

void HashAggregation::addInput(RowVectorPtr input) {
traceInput(input);
if (!pushdownChecked_) {
mayPushdown_ = operatorCtx_->driver()->mayPushdownAggregation(this);
pushdownChecked_ = true;
Expand Down
11 changes: 5 additions & 6 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
#include "velox/exec/MergeJoin.h"
#include "velox/exec/NestedLoopJoinBuild.h"
#include "velox/exec/NestedLoopJoinProbe.h"
#include "velox/exec/OperatorTraceScan.h"
#include "velox/exec/OrderBy.h"
#include "velox/exec/PartitionedOutput.h"
#include "velox/exec/QueryTraceScan.h"
#include "velox/exec/RowNumber.h"
#include "velox/exec/StreamingAggregation.h"
#include "velox/exec/TableScan.h"
Expand Down Expand Up @@ -595,11 +595,10 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
assignUniqueIdNode->taskUniqueId(),
assignUniqueIdNode->uniqueIdCounter()));
} else if (
const auto queryReplayScanNode =
std::dynamic_pointer_cast<const core::QueryTraceScanNode>(
planNode)) {
operators.push_back(std::make_unique<trace::QueryTraceScan>(
id, ctx.get(), queryReplayScanNode));
const auto traceScanNode =
std::dynamic_pointer_cast<const core::TraceScanNode>(planNode)) {
operators.push_back(std::make_unique<trace::OperatorTraceScan>(
id, ctx.get(), traceScanNode));
} else {
std::unique_ptr<Operator> extended;
if (planNode->requiresExchangeClient()) {
Expand Down
39 changes: 20 additions & 19 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/exec/Driver.h"
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/QueryTraceUtil.h"
#include "velox/exec/Task.h"
#include "velox/exec/TraceUtil.h"
#include "velox/expression/Expr.h"

using facebook::velox::common::testutil::TestValue;
Expand Down Expand Up @@ -107,17 +105,13 @@ void Operator::maybeSetReclaimer() {
}

void Operator::maybeSetTracer() {
const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!queryTraceConfig.has_value()) {
return;
}

if (operatorCtx_->driverCtx()->queryConfig().queryTraceMaxBytes() == 0) {
const auto& traceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!traceConfig.has_value()) {
return;
}

const auto nodeId = planNodeId();
if (queryTraceConfig->queryNodes.count(nodeId) == 0) {
if (traceConfig->queryNodes.count(nodeId) == 0) {
return;
}

Expand All @@ -134,20 +128,17 @@ void Operator::maybeSetTracer() {

const auto pipelineId = operatorCtx_->driverCtx()->pipelineId;
const auto driverId = operatorCtx_->driverCtx()->driverId;
LOG(INFO) << "Trace data for operator type: " << operatorType()
LOG(INFO) << "Trace input for operator type: " << operatorType()
<< ", operator id: " << operatorId() << ", pipeline: " << pipelineId
<< ", driver: " << driverId << ", task: " << taskId();
const auto opTraceDirPath = fmt::format(
"{}/{}/{}/{}/data",
queryTraceConfig->queryTraceDir,
planNodeId(),
pipelineId,
driverId);
const auto opTraceDirPath = trace::getOpTraceDirectory(
traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId);
trace::createTraceDirectory(opTraceDirPath);
inputTracer_ = std::make_unique<trace::QueryDataWriter>(
inputTracer_ = std::make_unique<trace::OperatorTraceWriter>(
this,
opTraceDirPath,
memory::traceMemoryPool(),
queryTraceConfig->updateAndCheckTraceLimitCB);
traceConfig->updateAndCheckTraceLimitCB);
}

void Operator::traceInput(const RowVectorPtr& input) {
Expand Down Expand Up @@ -319,6 +310,16 @@ OperatorStats Operator::stats(bool clear) {
return stats;
}

void Operator::close() {
input_ = nullptr;
results_.clear();
recordSpillStats();
finishTrace();

// Release the unused memory reservation on close.
operatorCtx_->pool()->release();
}

vector_size_t Operator::outputBatchRows(
std::optional<uint64_t> averageRowSize) const {
const auto& queryConfig = operatorCtx_->task()->queryCtx()->queryConfig();
Expand Down
13 changes: 3 additions & 10 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "velox/core/PlanNode.h"
#include "velox/exec/Driver.h"
#include "velox/exec/JoinBridge.h"
#include "velox/exec/QueryDataWriter.h"
#include "velox/exec/OperatorTraceWriter.h"
#include "velox/exec/Spiller.h"
#include "velox/type/Filter.h"

Expand Down Expand Up @@ -408,7 +408,6 @@ class Operator : public BaseRuntimeStatWriter {
/// e.g. the first operator in the pipeline.
virtual void noMoreInput() {
noMoreInput_ = true;
finishTrace();
}

/// Returns a RowVector with the result columns. Returns nullptr if
Expand Down Expand Up @@ -483,13 +482,7 @@ class Operator : public BaseRuntimeStatWriter {

/// Frees all resources associated with 'this'. No other methods
/// should be called after this.
virtual void close() {
input_ = nullptr;
results_.clear();
recordSpillStats();
// Release the unused memory reservation on close.
operatorCtx_->pool()->release();
}
virtual void close();

// Returns true if 'this' never has more output rows than input rows.
virtual bool isFilter() const {
Expand Down Expand Up @@ -781,7 +774,7 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::QueryDataWriter> inputTracer_;
std::unique_ptr<trace::OperatorTraceWriter> inputTracer_;

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand Down
Loading

0 comments on commit 28cf060

Please sign in to comment.