From 14d7200d7862540c5ff63c48183cc820a2ffc1ae Mon Sep 17 00:00:00 2001 From: duanmeng Date: Tue, 27 Aug 2024 02:47:41 -0700 Subject: [PATCH] Trace metadata during task creation (#10815) Summary: Create a directory named `$QueryTraceBaseDir/$taskId` when a task is initiated, if query tracing is enabled. This directory will store metadata related to the task, including the query plan node tree, query configurations, and connector properties. Part of https://github.com/facebookincubator/velox/issues/9668 Pull Request resolved: https://github.com/facebookincubator/velox/pull/10815 Reviewed By: Yuhta Differential Revision: D61808438 Pulled By: xiaoxmeng fbshipit-source-id: 57eff8f4b70405ba5c60fcd8315b025b22c2317b --- velox/common/memory/Memory.cpp | 5 + velox/common/memory/Memory.h | 9 + .../common/memory/tests/MemoryManagerTest.cpp | 63 +++---- velox/core/QueryConfig.h | 25 +++ velox/docs/configs.rst | 25 +++ velox/exec/Task.cpp | 42 ++++- velox/exec/Task.h | 10 ++ velox/exec/trace/CMakeLists.txt | 16 +- velox/exec/trace/QueryDataReader.cpp | 3 - velox/exec/trace/QueryDataWriter.cpp | 2 - velox/exec/trace/QueryMetadataReader.cpp | 4 - velox/exec/trace/QueryMetadataWriter.cpp | 3 - velox/exec/trace/QueryTraceConfig.cpp | 5 + velox/exec/trace/QueryTraceConfig.h | 6 +- velox/exec/trace/QueryTraceUtil.cpp | 38 ++++ velox/exec/trace/QueryTraceUtil.h | 26 +++ velox/exec/trace/test/CMakeLists.txt | 9 +- velox/exec/trace/test/QueryTraceTest.cpp | 166 ++++++++++++++++++ 18 files changed, 405 insertions(+), 52 deletions(-) create mode 100644 velox/exec/trace/QueryTraceUtil.cpp create mode 100644 velox/exec/trace/QueryTraceUtil.h diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 15213843b7a2..c42b3912b980 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -157,6 +157,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options) .coreOnAllocationFailureEnabled = options.coreOnAllocationFailureEnabled})}, spillPool_{addLeafPool("__sys_spilling__")}, + tracePool_{addLeafPool("__sys_tracing__")}, sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) { VELOX_CHECK_NOT_NULL(allocator_); VELOX_CHECK_NOT_NULL(arbitrator_); @@ -427,4 +428,8 @@ memory::MemoryPool* spillMemoryPool() { bool isSpillMemoryPool(memory::MemoryPool* pool) { return pool == spillMemoryPool(); } + +memory::MemoryPool* traceMemoryPool() { + return memory::MemoryManager::getInstance()->tracePool(); +} } // namespace facebook::velox::memory diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index ac1085f473bf..ac176fb2f5bd 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -342,6 +342,11 @@ class MemoryManager { return spillPool_.get(); } + /// Returns the process wide leaf memory pool used for query tracing. + MemoryPool* tracePool() const { + return tracePool_.get(); + } + const std::vector>& testingSharedLeafPools() { return sharedLeafPools_; } @@ -374,6 +379,7 @@ class MemoryManager { const std::shared_ptr sysRoot_; const std::shared_ptr spillPool_; + const std::shared_ptr tracePool_; const std::vector> sharedLeafPools_; mutable folly::SharedMutex mutex_; @@ -420,6 +426,9 @@ memory::MemoryPool* spillMemoryPool(); /// Returns true if the provided 'pool' is the spilling memory pool. bool isSpillMemoryPool(memory::MemoryPool* pool); +/// Returns the system-wide memory pool for tracing memory usage. +memory::MemoryPool* traceMemoryPool(); + FOLLY_ALWAYS_INLINE int32_t alignmentPadding(void* address, int32_t alignment) { auto extra = reinterpret_cast(address) % alignment; return extra == 0 ? 0 : alignment - extra; diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index 02ac9a11fb2a..d3da086e9b86 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -53,7 +53,7 @@ TEST_F(MemoryManagerTest, ctor) { const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools; { MemoryManager manager{}; - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); ASSERT_EQ(manager.capacity(), kMaxMemory); ASSERT_EQ(0, manager.getTotalBytes()); ASSERT_EQ(manager.alignment(), MemoryAllocator::kMaxAlignment); @@ -69,7 +69,7 @@ TEST_F(MemoryManagerTest, ctor) { .arbitratorCapacity = kCapacity, .arbitratorReservedCapacity = 0}}; ASSERT_EQ(kCapacity, manager.capacity()); - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment()); } { @@ -84,7 +84,7 @@ TEST_F(MemoryManagerTest, ctor) { ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment()); // TODO: replace with root pool memory tracker quota check. ASSERT_EQ( - kSharedPoolCount + 1, manager.testingDefaultRoot().getChildCount()); + kSharedPoolCount + 2, manager.testingDefaultRoot().getChildCount()); ASSERT_EQ(kCapacity, manager.capacity()); ASSERT_EQ(0, manager.getTotalBytes()); } @@ -103,7 +103,7 @@ TEST_F(MemoryManagerTest, ctor) { ASSERT_EQ( manager.toString(), "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of " - "pools 1\nList of root pools:\n\t__sys_root__\n" + "pools 2\nList of root pools:\n\t__sys_root__\n" "Memory Allocator[MALLOC capacity 4.00GB allocated bytes 0 " "allocated pages 0 mapped pages 0]\n" "ARBITRATOR[SHARED CAPACITY[4.00GB] PENDING[0] " @@ -246,10 +246,10 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) { TEST_F(MemoryManagerTest, defaultMemoryManager) { auto& managerA = toMemoryManager(deprecatedDefaultMemoryManager()); auto& managerB = toMemoryManager(deprecatedDefaultMemoryManager()); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1; - ASSERT_EQ(managerA.numPools(), 1); + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; + ASSERT_EQ(managerA.numPools(), 2); ASSERT_EQ(managerA.testingDefaultRoot().getChildCount(), kSharedPoolCount); - ASSERT_EQ(managerB.numPools(), 1); + ASSERT_EQ(managerB.numPools(), 2); ASSERT_EQ(managerB.testingDefaultRoot().getChildCount(), kSharedPoolCount); auto child1 = managerA.addLeafPool("child_1"); @@ -260,41 +260,44 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) { kSharedPoolCount + 2, managerA.testingDefaultRoot().getChildCount()); EXPECT_EQ( kSharedPoolCount + 2, managerB.testingDefaultRoot().getChildCount()); - ASSERT_EQ(managerA.numPools(), 3); - ASSERT_EQ(managerB.numPools(), 3); - auto pool = managerB.addRootPool(); ASSERT_EQ(managerA.numPools(), 4); ASSERT_EQ(managerB.numPools(), 4); + auto pool = managerB.addRootPool(); + ASSERT_EQ(managerA.numPools(), 5); + ASSERT_EQ(managerB.numPools(), 5); ASSERT_EQ( managerA.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); ASSERT_EQ( managerB.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); child1.reset(); EXPECT_EQ( kSharedPoolCount + 1, managerA.testingDefaultRoot().getChildCount()); child2.reset(); EXPECT_EQ(kSharedPoolCount, managerB.testingDefaultRoot().getChildCount()); + ASSERT_EQ(managerA.numPools(), 3); + ASSERT_EQ(managerB.numPools(), 3); + pool.reset(); ASSERT_EQ(managerA.numPools(), 2); ASSERT_EQ(managerB.numPools(), 2); - pool.reset(); - ASSERT_EQ(managerA.numPools(), 1); - ASSERT_EQ(managerB.numPools(), 1); ASSERT_EQ( managerA.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); ASSERT_EQ( managerB.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); const std::string detailedManagerStr = managerA.toString(true); ASSERT_THAT( detailedManagerStr, testing::HasSubstr( - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n")); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n")); ASSERT_THAT( detailedManagerStr, testing::HasSubstr("__sys_spilling__ usage 0B reserved 0B peak 0B\n")); + ASSERT_THAT( + detailedManagerStr, + testing::HasSubstr("__sys_tracing__ usage 0B reserved 0B peak 0B\n")); for (int i = 0; i < 32; ++i) { ASSERT_THAT( managerA.toString(true), @@ -306,7 +309,7 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) { // TODO: remove this test when remove deprecatedAddDefaultLeafMemoryPool. TEST(MemoryHeaderTest, addDefaultLeafMemoryPool) { auto& manager = toMemoryManager(deprecatedDefaultMemoryManager()); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1; + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; ASSERT_EQ(manager.testingDefaultRoot().getChildCount(), kSharedPoolCount); { auto poolA = deprecatedAddDefaultLeafMemoryPool(); @@ -361,7 +364,7 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) { MemoryManagerOptions options; options.alignment = alignment; MemoryManager manager{options}; - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); const int numPools = 100; std::vector> userRootPools; std::vector> userLeafPools; @@ -386,14 +389,14 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) { ASSERT_FALSE(rootUnamedPool->name().empty()); ASSERT_EQ(rootUnamedPool->kind(), MemoryPool::Kind::kAggregate); ASSERT_EQ(rootUnamedPool->parent(), nullptr); - ASSERT_EQ(manager.numPools(), 1 + numPools + 2); + ASSERT_EQ(manager.numPools(), 1 + numPools + 2 + 1); userLeafPools.clear(); leafUnamedPool.reset(); - ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1); + ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1); userRootPools.clear(); - ASSERT_EQ(manager.numPools(), 1 + 1); + ASSERT_EQ(manager.numPools(), 1 + 2); rootUnamedPool.reset(); - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); } // TODO: when run sequentially, e.g. `buck run dwio/memory/...`, this has side @@ -410,7 +413,7 @@ TEST_F(MemoryManagerTest, globalMemoryManager) { ASSERT_NE(manager, globalManager); ASSERT_EQ(manager, memoryManager()); auto* managerII = memoryManager(); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1; + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; { auto& rootI = manager->testingDefaultRoot(); const std::string childIName("some_child"); @@ -444,9 +447,9 @@ TEST_F(MemoryManagerTest, globalMemoryManager) { ASSERT_EQ(userRootChild->kind(), MemoryPool::Kind::kAggregate); ASSERT_EQ(rootI.getChildCount(), kSharedPoolCount + 1); ASSERT_EQ(rootII.getChildCount(), kSharedPoolCount + 1); - ASSERT_EQ(manager->numPools(), 2 + 1); + ASSERT_EQ(manager->numPools(), 2 + 2); } - ASSERT_EQ(manager->numPools(), 1); + ASSERT_EQ(manager->numPools(), 2); } TEST_F(MemoryManagerTest, alignmentOptionCheck) { @@ -544,9 +547,9 @@ TEST_F(MemoryManagerTest, concurrentPoolAccess) { } stopCheck = true; checkThread.join(); - ASSERT_EQ(manager.numPools(), pools.size() + 1); + ASSERT_EQ(manager.numPools(), pools.size() + 2); pools.clear(); - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); } TEST_F(MemoryManagerTest, quotaEnforcement) { @@ -654,7 +657,7 @@ TEST_F(MemoryManagerTest, disableMemoryPoolTracking) { ASSERT_EQ(manager.capacity(), 64LL << 20); ASSERT_EQ(manager.shrinkPools(), 0); // Default 1 system pool with 1 leaf child - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); VELOX_ASSERT_THROW( leaf0->allocate(38LL << 20), "Exceeded memory pool capacity"); diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 78f51375fa45..bc1a24976a71 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -346,6 +346,16 @@ class QueryConfig { /// derived using micro-benchmarking. static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows"; + /// Enable query tracing flag. + static constexpr const char* kQueryTraceEnabled = "query_trace_enabled"; + + /// Base dir of a query to store tracing data. + static constexpr const char* kQueryTraceDir = "query_trace_dir"; + + /// A comma-separated list of plan node ids whose input data will be traced. + /// Empty string if only want to trace the query metadata. + static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids"; + uint64_t queryMaxMemoryPerNode() const { return config::toCapacity( get(kQueryMaxMemoryPerNode, "0B"), @@ -611,6 +621,21 @@ class QueryConfig { return get(kSpillableReservationGrowthPct, kDefaultPct); } + /// Returns true if query tracing is enabled. + bool queryTraceEnabled() const { + return get(kQueryTraceEnabled, false); + } + + std::string queryTraceDir() const { + // The default query trace dir, empty by default. + return get(kQueryTraceDir, ""); + } + + std::string queryTraceNodeIds() const { + // The default query trace nodes, empty by default. + return get(kQueryTraceNodeIds, ""); + } + bool prestoArrayAggIgnoreNulls() const { return get(kPrestoArrayAggIgnoreNulls, false); } diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index cd782b2d763d..0e4582e1b5c1 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -686,4 +686,29 @@ Spark-specific Configuration the value of this config can not exceed the default value. * - spark.partition_id - integer + - - The current task's Spark partition ID. It's set by the query engine (Spark) prior to task execution. + +Tracing +-------- +.. list-table:: + :widths: 30 10 10 70 + :header-rows: 1 + + * - Property Name + - Type + - Default Value + - Description + * - query_trace_enabled + - bool + - true + - If true, enable query tracing. + * - query_trace_dir + - string + - + - The root directory to store the tracing data and metadata for a query. + * - query_trace_node_ids + - string + - + - A comma-separated list of plan node ids whose input data will be trace. If it is empty, then we only trace the + query metadata which includes the query plan and configs etc. diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 38006f9baf53..3ff2e7321f46 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -27,11 +27,11 @@ #include "velox/exec/HashBuild.h" #include "velox/exec/LocalPlanner.h" #include "velox/exec/MemoryReclaimer.h" -#include "velox/exec/Merge.h" #include "velox/exec/NestedLoopJoinBuild.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/Task.h" +#include "velox/exec/trace/QueryTraceUtil.h" using facebook::velox::common::testutil::TestValue; @@ -293,6 +293,7 @@ Task::Task( planFragment_(std::move(planFragment)), destination_(destination), queryCtx_(std::move(queryCtx)), + traceConfig_(maybeMakeTraceConfig()), mode_(mode), consumerSupplier_(std::move(consumerSupplier)), onError_(std::move(onError)), @@ -304,6 +305,8 @@ Task::Task( VELOX_CHECK_NULL( dynamic_cast(queryCtx_->executor())); } + + maybeInitQueryTrace(); } Task::~Task() { @@ -2833,6 +2836,43 @@ std::shared_ptr Task::getExchangeClientLocked( return exchangeClients_[pipelineId]; } +std::optional Task::maybeMakeTraceConfig() const { + const auto& queryConfig = queryCtx_->queryConfig(); + if (!queryConfig.queryTraceEnabled()) { + return std::nullopt; + } + + VELOX_USER_CHECK( + !queryConfig.queryTraceDir().empty(), + "Query trace enabled but the trace dir is not set"); + + const auto queryTraceNodes = queryConfig.queryTraceNodeIds(); + if (queryTraceNodes.empty()) { + return trace::QueryTraceConfig(queryConfig.queryTraceDir()); + } + + std::vector nodes; + folly::split(',', queryTraceNodes, nodes); + std::unordered_set nodeSet(nodes.begin(), nodes.end()); + VELOX_CHECK_EQ(nodeSet.size(), nodes.size()); + LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes; + return trace::QueryTraceConfig( + std::move(nodeSet), queryConfig.queryTraceDir()); +} + +void Task::maybeInitQueryTrace() { + if (!traceConfig_) { + return; + } + + const auto traceTaskDir = + fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_); + trace::createTraceDirectory(traceTaskDir); + const auto queryMetadatWriter = std::make_unique( + traceTaskDir, memory::traceMemoryPool()); + queryMetadatWriter->write(queryCtx_, planFragment_.planNode); +} + void Task::testingVisitDrivers(const std::function& callback) { std::lock_guard l(mutex_); for (int i = 0; i < drivers_.size(); ++i) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 34b7c8c12936..a297ca9cb71c 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -24,6 +24,8 @@ #include "velox/exec/Split.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" +#include "velox/exec/trace/QueryMetadataWriter.h" +#include "velox/exec/trace/QueryTraceConfig.h" #include "velox/vector/ComplexVector.h" namespace facebook::velox::exec { @@ -970,6 +972,13 @@ class Task : public std::enable_shared_from_this { std::shared_ptr getExchangeClientLocked( int32_t pipelineId) const; + // Builds the query trace config. + std::optional maybeMakeTraceConfig() const; + + // Create a 'QueryMetadtaWriter' to trace the query metadata if the query + // trace enabled. + void maybeInitQueryTrace(); + // The helper class used to maintain 'numCreatedTasks_' and 'numDeletedTasks_' // on task construction and destruction. class TaskCounter { @@ -1000,6 +1009,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment_; const int destination_; const std::shared_ptr queryCtx_; + const std::optional traceConfig_; // The execution mode of the task. It is enforced that a task can only be // executed in a single mode throughout its lifetime diff --git a/velox/exec/trace/CMakeLists.txt b/velox/exec/trace/CMakeLists.txt index 532f3ed27f32..f8cc53e08ae4 100644 --- a/velox/exec/trace/CMakeLists.txt +++ b/velox/exec/trace/CMakeLists.txt @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_query_trace_exec QueryMetadataWriter.cpp QueryTraceConfig.cpp - QueryDataWriter.cpp) +velox_add_library( + velox_query_trace_exec + QueryMetadataWriter.cpp + QueryTraceConfig.cpp + QueryDataWriter.cpp + QueryTraceUtil.cpp) -target_link_libraries( +velox_link_libraries( velox_query_trace_exec PRIVATE velox_common_io @@ -26,10 +30,10 @@ target_link_libraries( velox_common_base velox_presto_serializer) -add_library(velox_query_trace_retrieve QueryDataReader.cpp - QueryMetadataReader.cpp) +velox_add_library(velox_query_trace_retrieve QueryDataReader.cpp + QueryMetadataReader.cpp) -target_link_libraries( +velox_link_libraries( velox_query_trace_retrieve velox_common_io velox_file diff --git a/velox/exec/trace/QueryDataReader.cpp b/velox/exec/trace/QueryDataReader.cpp index b234330dd0e5..f0175fe11b36 100644 --- a/velox/exec/trace/QueryDataReader.cpp +++ b/velox/exec/trace/QueryDataReader.cpp @@ -17,9 +17,6 @@ #include "velox/exec/trace/QueryDataReader.h" #include "velox/common/file/File.h" -#include "velox/connectors/hive/HiveDataSink.h" -#include "velox/connectors/hive/TableHandle.h" -#include "velox/exec/TableWriter.h" #include "velox/exec/trace/QueryTraceTraits.h" namespace facebook::velox::exec::trace { diff --git a/velox/exec/trace/QueryDataWriter.cpp b/velox/exec/trace/QueryDataWriter.cpp index 544e7a3d4580..57dc4284cd10 100644 --- a/velox/exec/trace/QueryDataWriter.cpp +++ b/velox/exec/trace/QueryDataWriter.cpp @@ -18,8 +18,6 @@ #include "velox/common/base/SpillStats.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" -#include "velox/exec/TreeOfLosers.h" -#include "velox/exec/UnorderedStreamReader.h" #include "velox/exec/trace/QueryTraceTraits.h" #include "velox/serializers/PrestoSerializer.h" diff --git a/velox/exec/trace/QueryMetadataReader.cpp b/velox/exec/trace/QueryMetadataReader.cpp index 8843aa49a2ab..99fe7cc8c435 100644 --- a/velox/exec/trace/QueryMetadataReader.cpp +++ b/velox/exec/trace/QueryMetadataReader.cpp @@ -18,11 +18,7 @@ #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" -#include "velox/connectors/hive/HiveDataSink.h" -#include "velox/connectors/hive/TableHandle.h" #include "velox/core/PlanNode.h" -#include "velox/exec/PartitionFunction.h" -#include "velox/exec/TableWriter.h" #include "velox/exec/trace/QueryTraceTraits.h" namespace facebook::velox::exec::trace { diff --git a/velox/exec/trace/QueryMetadataWriter.cpp b/velox/exec/trace/QueryMetadataWriter.cpp index beba2e809746..e14cdb197f44 100644 --- a/velox/exec/trace/QueryMetadataWriter.cpp +++ b/velox/exec/trace/QueryMetadataWriter.cpp @@ -17,11 +17,8 @@ #include "velox/exec/trace/QueryMetadataWriter.h" #include "velox/common/config/Config.h" #include "velox/common/file/File.h" -#include "velox/connectors/hive/HiveDataSink.h" -#include "velox/connectors/hive/TableHandle.h" #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" -#include "velox/exec/TableWriter.h" #include "velox/exec/trace/QueryTraceTraits.h" namespace facebook::velox::exec::trace { diff --git a/velox/exec/trace/QueryTraceConfig.cpp b/velox/exec/trace/QueryTraceConfig.cpp index 2437a632fb43..233226ebccf5 100644 --- a/velox/exec/trace/QueryTraceConfig.cpp +++ b/velox/exec/trace/QueryTraceConfig.cpp @@ -24,4 +24,9 @@ QueryTraceConfig::QueryTraceConfig( : queryNodes(std::move(_queryNodeIds)), queryTraceDir(std::move(_queryTraceDir)) {} +QueryTraceConfig::QueryTraceConfig(std::string _queryTraceDir) + : QueryTraceConfig( + std::unordered_set{}, + std::move(_queryTraceDir)) {} + } // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/QueryTraceConfig.h b/velox/exec/trace/QueryTraceConfig.h index 8afcbf22fb53..0e9c22818c34 100644 --- a/velox/exec/trace/QueryTraceConfig.h +++ b/velox/exec/trace/QueryTraceConfig.h @@ -21,15 +21,17 @@ namespace facebook::velox::exec::trace { struct QueryTraceConfig { - /// Target query trace nodes + /// Target query trace nodes. std::unordered_set queryNodes; - /// Base dir of query trace, normmaly it is $prefix/$taskId. + /// Base dir of query trace. std::string queryTraceDir; QueryTraceConfig( std::unordered_set _queryNodeIds, std::string _queryTraceDir); + QueryTraceConfig(std::string _queryTraceDir); + QueryTraceConfig() = default; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/QueryTraceUtil.cpp b/velox/exec/trace/QueryTraceUtil.cpp new file mode 100644 index 000000000000..437d3eee224e --- /dev/null +++ b/velox/exec/trace/QueryTraceUtil.cpp @@ -0,0 +1,38 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/trace/QueryTraceUtil.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/file/FileSystems.h" + +namespace facebook::velox::exec::trace { + +void createTraceDirectory(const std::string& traceDir) { + try { + const auto fs = filesystems::getFileSystem(traceDir, nullptr); + if (fs->exists(traceDir)) { + fs->rmdir(traceDir); + } + fs->mkdir(traceDir); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to create trace directory '{}' with error: {}", + traceDir, + e.what()); + } +} + +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/QueryTraceUtil.h b/velox/exec/trace/QueryTraceUtil.h new file mode 100644 index 000000000000..826aa35283a2 --- /dev/null +++ b/velox/exec/trace/QueryTraceUtil.h @@ -0,0 +1,26 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace facebook::velox::exec::trace { + +/// Creates a directory to store the query trace metdata and data. +void createTraceDirectory(const std::string& traceDir); + +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/test/CMakeLists.txt b/velox/exec/trace/test/CMakeLists.txt index 9c4ffe9dd358..861b5ffb339a 100644 --- a/velox/exec/trace/test/CMakeLists.txt +++ b/velox/exec/trace/test/CMakeLists.txt @@ -28,4 +28,11 @@ target_link_libraries( velox_memory velox_query_trace_exec velox_query_trace_retrieve - velox_vector_fuzzer) + velox_vector_fuzzer + GTest::gtest_main + GTest::gmock + Folly::folly + gflags::gflags + glog::glog + fmt::fmt + ${FILESYSTEM}) diff --git a/velox/exec/trace/test/QueryTraceTest.cpp b/velox/exec/trace/test/QueryTraceTest.cpp index 765eeccaaf53..8b969cc1b9b6 100644 --- a/velox/exec/trace/test/QueryTraceTest.cpp +++ b/velox/exec/trace/test/QueryTraceTest.cpp @@ -28,6 +28,7 @@ #include "velox/exec/trace/QueryDataWriter.h" #include "velox/exec/trace/QueryMetadataReader.h" #include "velox/exec/trace/QueryMetadataWriter.h" +#include "velox/exec/trace/QueryTraceUtil.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/tests/utils/VectorTestBase.h" @@ -202,4 +203,169 @@ TEST_F(QueryTracerTest, traceMetadata) { ASSERT_EQ(actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key)); } } + +TEST_F(QueryTracerTest, task) { + const auto rowType = + ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, + {BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()}); + std::vector rows; + constexpr auto numBatch = 1; + rows.reserve(numBatch); + for (auto i = 0; i < numBatch; ++i) { + rows.push_back(vectorFuzzer_.fuzzRow(rowType, 2)); + } + + auto planNodeIdGenerator = std::make_shared(); + const auto planNode = + PlanBuilder(planNodeIdGenerator) + .values(rows, false) + .project({"c0", "c1", "c2"}) + .hashJoin( + {"c0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator) + .values(rows, true) + .singleAggregation({"c0", "c1"}, {"min(c2)"}) + .project({"c0 AS u0", "c1 AS u1", "a0 AS u2"}) + .planNode(), + "c0 < 135", + {"c0", "c1", "c2"}, + core::JoinType::kInner) + .planNode(); + const auto expectedResult = + AssertQueryBuilder(planNode).maxDrivers(1).copyResults(pool()); + + for (const auto& queryTraceNodeIds : {"1,2", ""}) { + const auto outputDir = TempDirectoryPath::create(); + const auto expectedQueryConfigs = + std::unordered_map{ + {core::QueryConfig::kSpillEnabled, "true"}, + {core::QueryConfig::kSpillNumPartitionBits, "17"}, + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, outputDir->getPath()}, + {core::QueryConfig::kQueryTraceEnabled, queryTraceNodeIds}, + {"key1", "value1"}, + }; + const auto expectedConnectorProperties = + std::unordered_map>{ + {"test_trace", + std::make_shared( + std::unordered_map{ + {"cKey1", "cVal1"}})}}; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), + core::QueryConfig(expectedQueryConfigs), + expectedConnectorProperties); + + std::shared_ptr task; + const auto result = AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool(), task); + assertEqualResults({result}, {expectedResult}); + + const auto expectedDir = + fmt::format("{}/{}", outputDir->getPath(), task->taskId()); + const auto fs = filesystems::getFileSystem(expectedDir, nullptr); + const auto actaulDirs = fs->list(outputDir->getPath()); + ASSERT_EQ(actaulDirs.size(), 1); + ASSERT_EQ(actaulDirs.at(0), expectedDir); + + std::unordered_map acutalQueryConfigs; + std:: + unordered_map> + actualConnectorProperties; + core::PlanNodePtr actualQueryPlan; + auto reader = trace::QueryMetadataReader(expectedDir, pool()); + reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan); + + ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode)); + ASSERT_EQ(acutalQueryConfigs.size(), expectedQueryConfigs.size()); + for (const auto& [key, value] : acutalQueryConfigs) { + ASSERT_EQ(acutalQueryConfigs.at(key), expectedQueryConfigs.at(key)); + } + + ASSERT_EQ( + actualConnectorProperties.size(), expectedConnectorProperties.size()); + ASSERT_EQ(actualConnectorProperties.count("test_trace"), 1); + const auto expectedConnectorConfigs = + expectedConnectorProperties.at("test_trace")->rawConfigsCopy(); + const auto actualConnectorConfigs = + actualConnectorProperties.at("test_trace"); + for (const auto& [key, value] : actualConnectorConfigs) { + ASSERT_EQ( + actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key)); + } + } +} + +TEST_F(QueryTracerTest, error) { + const auto planNode = PlanBuilder().values({}).planNode(); + const auto expectedQueryConfigs = + std::unordered_map{ + {core::QueryConfig::kSpillEnabled, "true"}, + {core::QueryConfig::kSpillNumPartitionBits, "17"}, + {core::QueryConfig::kQueryTraceEnabled, "true"}, + }; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(expectedQueryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode).queryCtx(queryCtx).maxDrivers(1).copyResults( + pool()), + "Query trace enabled but the trace dir is not set"); +} + +TEST_F(QueryTracerTest, traceDir) { + const auto outputDir = TempDirectoryPath::create(); + const auto rootDir = outputDir->getPath(); + const auto fs = filesystems::getFileSystem(rootDir, nullptr); + auto dir1 = fmt::format("{}/{}", outputDir->getPath(), "t1"); + trace::createTraceDirectory(dir1); + ASSERT_TRUE(fs->exists(dir1)); + + auto dir2 = fmt::format("{}/{}", dir1, "t1_1"); + trace::createTraceDirectory(dir2); + ASSERT_TRUE(fs->exists(dir2)); + + // It will remove the old dir1 along with its subdir when created the dir1 + // again. + trace::createTraceDirectory(dir1); + ASSERT_TRUE(fs->exists(dir1)); + ASSERT_FALSE(fs->exists(dir2)); + + const auto parentDir = fmt::format("{}/{}", outputDir->getPath(), "p"); + fs->mkdir(parentDir); + + constexpr auto numThreads = 5; + std::vector queryThreads; + queryThreads.reserve(numThreads); + std::set expectedDirs; + for (int i = 0; i < numThreads; ++i) { + queryThreads.emplace_back([&, i]() { + const auto dir = fmt::format("{}/s{}", parentDir, i); + trace::createTraceDirectory(dir); + expectedDirs.insert(dir); + }); + } + + for (auto& queryThread : queryThreads) { + queryThread.join(); + } + + const auto actualDirs = fs->list(parentDir); + ASSERT_EQ(actualDirs.size(), numThreads); + ASSERT_EQ(actualDirs.size(), expectedDirs.size()); + for (const auto& dir : actualDirs) { + ASSERT_EQ(expectedDirs.count(dir), 1); + } +} } // namespace facebook::velox::exec::test + +// This main is needed for some tests on linux. +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + // Signal handler required for ThreadDebugInfoTest + facebook::velox::process::addDefaultFatalSignalHandler(); + folly::Init init(&argc, &argv, false); + return RUN_ALL_TESTS(); +}