Skip to content

Commit

Permalink
Trace metadata during task creation (facebookincubator#10815)
Browse files Browse the repository at this point in the history
Summary:
Create a directory named `$QueryTraceBaseDir/$taskId` when a task is initiated,
if query tracing is enabled. This directory will store metadata related to the task,
including the query plan node tree, query configurations, and connector properties.

Part of facebookincubator#9668

Pull Request resolved: facebookincubator#10815

Reviewed By: Yuhta

Differential Revision: D61808438

Pulled By: xiaoxmeng

fbshipit-source-id: 57eff8f4b70405ba5c60fcd8315b025b22c2317b
  • Loading branch information
duanmeng authored and Joe-Abraham committed Sep 3, 2024
1 parent 32638e3 commit 14d7200
Show file tree
Hide file tree
Showing 18 changed files with 405 additions and 52 deletions.
5 changes: 5 additions & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
.coreOnAllocationFailureEnabled =
options.coreOnAllocationFailureEnabled})},
spillPool_{addLeafPool("__sys_spilling__")},
tracePool_{addLeafPool("__sys_tracing__")},
sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) {
VELOX_CHECK_NOT_NULL(allocator_);
VELOX_CHECK_NOT_NULL(arbitrator_);
Expand Down Expand Up @@ -427,4 +428,8 @@ memory::MemoryPool* spillMemoryPool() {
bool isSpillMemoryPool(memory::MemoryPool* pool) {
return pool == spillMemoryPool();
}

memory::MemoryPool* traceMemoryPool() {
return memory::MemoryManager::getInstance()->tracePool();
}
} // namespace facebook::velox::memory
9 changes: 9 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ class MemoryManager {
return spillPool_.get();
}

/// Returns the process wide leaf memory pool used for query tracing.
MemoryPool* tracePool() const {
return tracePool_.get();
}

const std::vector<std::shared_ptr<MemoryPool>>& testingSharedLeafPools() {
return sharedLeafPools_;
}
Expand Down Expand Up @@ -374,6 +379,7 @@ class MemoryManager {

const std::shared_ptr<MemoryPool> sysRoot_;
const std::shared_ptr<MemoryPool> spillPool_;
const std::shared_ptr<MemoryPool> tracePool_;
const std::vector<std::shared_ptr<MemoryPool>> sharedLeafPools_;

mutable folly::SharedMutex mutex_;
Expand Down Expand Up @@ -420,6 +426,9 @@ memory::MemoryPool* spillMemoryPool();
/// Returns true if the provided 'pool' is the spilling memory pool.
bool isSpillMemoryPool(memory::MemoryPool* pool);

/// Returns the system-wide memory pool for tracing memory usage.
memory::MemoryPool* traceMemoryPool();

FOLLY_ALWAYS_INLINE int32_t alignmentPadding(void* address, int32_t alignment) {
auto extra = reinterpret_cast<uintptr_t>(address) % alignment;
return extra == 0 ? 0 : alignment - extra;
Expand Down
63 changes: 33 additions & 30 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TEST_F(MemoryManagerTest, ctor) {
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools;
{
MemoryManager manager{};
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.capacity(), kMaxMemory);
ASSERT_EQ(0, manager.getTotalBytes());
ASSERT_EQ(manager.alignment(), MemoryAllocator::kMaxAlignment);
Expand All @@ -69,7 +69,7 @@ TEST_F(MemoryManagerTest, ctor) {
.arbitratorCapacity = kCapacity,
.arbitratorReservedCapacity = 0}};
ASSERT_EQ(kCapacity, manager.capacity());
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment());
}
{
Expand All @@ -84,7 +84,7 @@ TEST_F(MemoryManagerTest, ctor) {
ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment());
// TODO: replace with root pool memory tracker quota check.
ASSERT_EQ(
kSharedPoolCount + 1, manager.testingDefaultRoot().getChildCount());
kSharedPoolCount + 2, manager.testingDefaultRoot().getChildCount());
ASSERT_EQ(kCapacity, manager.capacity());
ASSERT_EQ(0, manager.getTotalBytes());
}
Expand All @@ -103,7 +103,7 @@ TEST_F(MemoryManagerTest, ctor) {
ASSERT_EQ(
manager.toString(),
"Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of "
"pools 1\nList of root pools:\n\t__sys_root__\n"
"pools 2\nList of root pools:\n\t__sys_root__\n"
"Memory Allocator[MALLOC capacity 4.00GB allocated bytes 0 "
"allocated pages 0 mapped pages 0]\n"
"ARBITRATOR[SHARED CAPACITY[4.00GB] PENDING[0] "
Expand Down Expand Up @@ -246,10 +246,10 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) {
TEST_F(MemoryManagerTest, defaultMemoryManager) {
auto& managerA = toMemoryManager(deprecatedDefaultMemoryManager());
auto& managerB = toMemoryManager(deprecatedDefaultMemoryManager());
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1;
ASSERT_EQ(managerA.numPools(), 1);
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
ASSERT_EQ(managerA.numPools(), 2);
ASSERT_EQ(managerA.testingDefaultRoot().getChildCount(), kSharedPoolCount);
ASSERT_EQ(managerB.numPools(), 1);
ASSERT_EQ(managerB.numPools(), 2);
ASSERT_EQ(managerB.testingDefaultRoot().getChildCount(), kSharedPoolCount);

auto child1 = managerA.addLeafPool("child_1");
Expand All @@ -260,41 +260,44 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) {
kSharedPoolCount + 2, managerA.testingDefaultRoot().getChildCount());
EXPECT_EQ(
kSharedPoolCount + 2, managerB.testingDefaultRoot().getChildCount());
ASSERT_EQ(managerA.numPools(), 3);
ASSERT_EQ(managerB.numPools(), 3);
auto pool = managerB.addRootPool();
ASSERT_EQ(managerA.numPools(), 4);
ASSERT_EQ(managerB.numPools(), 4);
auto pool = managerB.addRootPool();
ASSERT_EQ(managerA.numPools(), 5);
ASSERT_EQ(managerB.numPools(), 5);
ASSERT_EQ(
managerA.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
ASSERT_EQ(
managerB.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
child1.reset();
EXPECT_EQ(
kSharedPoolCount + 1, managerA.testingDefaultRoot().getChildCount());
child2.reset();
EXPECT_EQ(kSharedPoolCount, managerB.testingDefaultRoot().getChildCount());
ASSERT_EQ(managerA.numPools(), 3);
ASSERT_EQ(managerB.numPools(), 3);
pool.reset();
ASSERT_EQ(managerA.numPools(), 2);
ASSERT_EQ(managerB.numPools(), 2);
pool.reset();
ASSERT_EQ(managerA.numPools(), 1);
ASSERT_EQ(managerB.numPools(), 1);
ASSERT_EQ(
managerA.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
ASSERT_EQ(
managerB.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
const std::string detailedManagerStr = managerA.toString(true);
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr(
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n"));
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n"));
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr("__sys_spilling__ usage 0B reserved 0B peak 0B\n"));
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr("__sys_tracing__ usage 0B reserved 0B peak 0B\n"));
for (int i = 0; i < 32; ++i) {
ASSERT_THAT(
managerA.toString(true),
Expand All @@ -306,7 +309,7 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) {
// TODO: remove this test when remove deprecatedAddDefaultLeafMemoryPool.
TEST(MemoryHeaderTest, addDefaultLeafMemoryPool) {
auto& manager = toMemoryManager(deprecatedDefaultMemoryManager());
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1;
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
ASSERT_EQ(manager.testingDefaultRoot().getChildCount(), kSharedPoolCount);
{
auto poolA = deprecatedAddDefaultLeafMemoryPool();
Expand Down Expand Up @@ -361,7 +364,7 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) {
MemoryManagerOptions options;
options.alignment = alignment;
MemoryManager manager{options};
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
const int numPools = 100;
std::vector<std::shared_ptr<MemoryPool>> userRootPools;
std::vector<std::shared_ptr<MemoryPool>> userLeafPools;
Expand All @@ -386,14 +389,14 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) {
ASSERT_FALSE(rootUnamedPool->name().empty());
ASSERT_EQ(rootUnamedPool->kind(), MemoryPool::Kind::kAggregate);
ASSERT_EQ(rootUnamedPool->parent(), nullptr);
ASSERT_EQ(manager.numPools(), 1 + numPools + 2);
ASSERT_EQ(manager.numPools(), 1 + numPools + 2 + 1);
userLeafPools.clear();
leafUnamedPool.reset();
ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1);
ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1);
userRootPools.clear();
ASSERT_EQ(manager.numPools(), 1 + 1);
ASSERT_EQ(manager.numPools(), 1 + 2);
rootUnamedPool.reset();
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
}

// TODO: when run sequentially, e.g. `buck run dwio/memory/...`, this has side
Expand All @@ -410,7 +413,7 @@ TEST_F(MemoryManagerTest, globalMemoryManager) {
ASSERT_NE(manager, globalManager);
ASSERT_EQ(manager, memoryManager());
auto* managerII = memoryManager();
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1;
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
{
auto& rootI = manager->testingDefaultRoot();
const std::string childIName("some_child");
Expand Down Expand Up @@ -444,9 +447,9 @@ TEST_F(MemoryManagerTest, globalMemoryManager) {
ASSERT_EQ(userRootChild->kind(), MemoryPool::Kind::kAggregate);
ASSERT_EQ(rootI.getChildCount(), kSharedPoolCount + 1);
ASSERT_EQ(rootII.getChildCount(), kSharedPoolCount + 1);
ASSERT_EQ(manager->numPools(), 2 + 1);
ASSERT_EQ(manager->numPools(), 2 + 2);
}
ASSERT_EQ(manager->numPools(), 1);
ASSERT_EQ(manager->numPools(), 2);
}

TEST_F(MemoryManagerTest, alignmentOptionCheck) {
Expand Down Expand Up @@ -544,9 +547,9 @@ TEST_F(MemoryManagerTest, concurrentPoolAccess) {
}
stopCheck = true;
checkThread.join();
ASSERT_EQ(manager.numPools(), pools.size() + 1);
ASSERT_EQ(manager.numPools(), pools.size() + 2);
pools.clear();
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
}

TEST_F(MemoryManagerTest, quotaEnforcement) {
Expand Down Expand Up @@ -654,7 +657,7 @@ TEST_F(MemoryManagerTest, disableMemoryPoolTracking) {
ASSERT_EQ(manager.capacity(), 64LL << 20);
ASSERT_EQ(manager.shrinkPools(), 0);
// Default 1 system pool with 1 leaf child
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);

VELOX_ASSERT_THROW(
leaf0->allocate(38LL << 20), "Exceeded memory pool capacity");
Expand Down
25 changes: 25 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,16 @@ class QueryConfig {
/// derived using micro-benchmarking.
static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows";

/// Enable query tracing flag.
static constexpr const char* kQueryTraceEnabled = "query_trace_enabled";

/// Base dir of a query to store tracing data.
static constexpr const char* kQueryTraceDir = "query_trace_dir";

/// A comma-separated list of plan node ids whose input data will be traced.
/// Empty string if only want to trace the query metadata.
static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids";

uint64_t queryMaxMemoryPerNode() const {
return config::toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"),
Expand Down Expand Up @@ -611,6 +621,21 @@ class QueryConfig {
return get<int32_t>(kSpillableReservationGrowthPct, kDefaultPct);
}

/// Returns true if query tracing is enabled.
bool queryTraceEnabled() const {
return get<bool>(kQueryTraceEnabled, false);
}

std::string queryTraceDir() const {
// The default query trace dir, empty by default.
return get<std::string>(kQueryTraceDir, "");
}

std::string queryTraceNodeIds() const {
// The default query trace nodes, empty by default.
return get<std::string>(kQueryTraceNodeIds, "");
}

bool prestoArrayAggIgnoreNulls() const {
return get<bool>(kPrestoArrayAggIgnoreNulls, false);
}
Expand Down
25 changes: 25 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -686,4 +686,29 @@ Spark-specific Configuration
the value of this config can not exceed the default value.
* - spark.partition_id
- integer
-
- The current task's Spark partition ID. It's set by the query engine (Spark) prior to task execution.

Tracing
--------
.. list-table::
:widths: 30 10 10 70
:header-rows: 1

* - Property Name
- Type
- Default Value
- Description
* - query_trace_enabled
- bool
- true
- If true, enable query tracing.
* - query_trace_dir
- string
-
- The root directory to store the tracing data and metadata for a query.
* - query_trace_node_ids
- string
-
- A comma-separated list of plan node ids whose input data will be trace. If it is empty, then we only trace the
query metadata which includes the query plan and configs etc.
42 changes: 41 additions & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
#include "velox/exec/HashBuild.h"
#include "velox/exec/LocalPlanner.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/exec/Merge.h"
#include "velox/exec/NestedLoopJoinBuild.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/exec/Task.h"
#include "velox/exec/trace/QueryTraceUtil.h"

using facebook::velox::common::testutil::TestValue;

Expand Down Expand Up @@ -293,6 +293,7 @@ Task::Task(
planFragment_(std::move(planFragment)),
destination_(destination),
queryCtx_(std::move(queryCtx)),
traceConfig_(maybeMakeTraceConfig()),
mode_(mode),
consumerSupplier_(std::move(consumerSupplier)),
onError_(std::move(onError)),
Expand All @@ -304,6 +305,8 @@ Task::Task(
VELOX_CHECK_NULL(
dynamic_cast<const folly::InlineLikeExecutor*>(queryCtx_->executor()));
}

maybeInitQueryTrace();
}

Task::~Task() {
Expand Down Expand Up @@ -2833,6 +2836,43 @@ std::shared_ptr<ExchangeClient> Task::getExchangeClientLocked(
return exchangeClients_[pipelineId];
}

std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
const auto& queryConfig = queryCtx_->queryConfig();
if (!queryConfig.queryTraceEnabled()) {
return std::nullopt;
}

VELOX_USER_CHECK(
!queryConfig.queryTraceDir().empty(),
"Query trace enabled but the trace dir is not set");

const auto queryTraceNodes = queryConfig.queryTraceNodeIds();
if (queryTraceNodes.empty()) {
return trace::QueryTraceConfig(queryConfig.queryTraceDir());
}

std::vector<std::string> nodes;
folly::split(',', queryTraceNodes, nodes);
std::unordered_set<std::string> nodeSet(nodes.begin(), nodes.end());
VELOX_CHECK_EQ(nodeSet.size(), nodes.size());
LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes;
return trace::QueryTraceConfig(
std::move(nodeSet), queryConfig.queryTraceDir());
}

void Task::maybeInitQueryTrace() {
if (!traceConfig_) {
return;
}

const auto traceTaskDir =
fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_);
trace::createTraceDirectory(traceTaskDir);
const auto queryMetadatWriter = std::make_unique<trace::QueryMetadataWriter>(
traceTaskDir, memory::traceMemoryPool());
queryMetadatWriter->write(queryCtx_, planFragment_.planNode);
}

void Task::testingVisitDrivers(const std::function<void(Driver*)>& callback) {
std::lock_guard<std::timed_mutex> l(mutex_);
for (int i = 0; i < drivers_.size(); ++i) {
Expand Down
Loading

0 comments on commit 14d7200

Please sign in to comment.