diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 15213843b7a2..c42b3912b980 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -157,6 +157,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options) .coreOnAllocationFailureEnabled = options.coreOnAllocationFailureEnabled})}, spillPool_{addLeafPool("__sys_spilling__")}, + tracePool_{addLeafPool("__sys_tracing__")}, sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) { VELOX_CHECK_NOT_NULL(allocator_); VELOX_CHECK_NOT_NULL(arbitrator_); @@ -427,4 +428,8 @@ memory::MemoryPool* spillMemoryPool() { bool isSpillMemoryPool(memory::MemoryPool* pool) { return pool == spillMemoryPool(); } + +memory::MemoryPool* traceMemoryPool() { + return memory::MemoryManager::getInstance()->tracePool(); +} } // namespace facebook::velox::memory diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index ac1085f473bf..ac176fb2f5bd 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -342,6 +342,11 @@ class MemoryManager { return spillPool_.get(); } + /// Returns the process wide leaf memory pool used for query tracing. + MemoryPool* tracePool() const { + return tracePool_.get(); + } + const std::vector>& testingSharedLeafPools() { return sharedLeafPools_; } @@ -374,6 +379,7 @@ class MemoryManager { const std::shared_ptr sysRoot_; const std::shared_ptr spillPool_; + const std::shared_ptr tracePool_; const std::vector> sharedLeafPools_; mutable folly::SharedMutex mutex_; @@ -420,6 +426,9 @@ memory::MemoryPool* spillMemoryPool(); /// Returns true if the provided 'pool' is the spilling memory pool. bool isSpillMemoryPool(memory::MemoryPool* pool); +/// Returns the system-wide memory pool for tracing memory usage. +memory::MemoryPool* traceMemoryPool(); + FOLLY_ALWAYS_INLINE int32_t alignmentPadding(void* address, int32_t alignment) { auto extra = reinterpret_cast(address) % alignment; return extra == 0 ? 0 : alignment - extra; diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index 02ac9a11fb2a..d3da086e9b86 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -53,7 +53,7 @@ TEST_F(MemoryManagerTest, ctor) { const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools; { MemoryManager manager{}; - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); ASSERT_EQ(manager.capacity(), kMaxMemory); ASSERT_EQ(0, manager.getTotalBytes()); ASSERT_EQ(manager.alignment(), MemoryAllocator::kMaxAlignment); @@ -69,7 +69,7 @@ TEST_F(MemoryManagerTest, ctor) { .arbitratorCapacity = kCapacity, .arbitratorReservedCapacity = 0}}; ASSERT_EQ(kCapacity, manager.capacity()); - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment()); } { @@ -84,7 +84,7 @@ TEST_F(MemoryManagerTest, ctor) { ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment()); // TODO: replace with root pool memory tracker quota check. ASSERT_EQ( - kSharedPoolCount + 1, manager.testingDefaultRoot().getChildCount()); + kSharedPoolCount + 2, manager.testingDefaultRoot().getChildCount()); ASSERT_EQ(kCapacity, manager.capacity()); ASSERT_EQ(0, manager.getTotalBytes()); } @@ -103,7 +103,7 @@ TEST_F(MemoryManagerTest, ctor) { ASSERT_EQ( manager.toString(), "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of " - "pools 1\nList of root pools:\n\t__sys_root__\n" + "pools 2\nList of root pools:\n\t__sys_root__\n" "Memory Allocator[MALLOC capacity 4.00GB allocated bytes 0 " "allocated pages 0 mapped pages 0]\n" "ARBITRATOR[SHARED CAPACITY[4.00GB] PENDING[0] " @@ -246,10 +246,10 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) { TEST_F(MemoryManagerTest, defaultMemoryManager) { auto& managerA = toMemoryManager(deprecatedDefaultMemoryManager()); auto& managerB = toMemoryManager(deprecatedDefaultMemoryManager()); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1; - ASSERT_EQ(managerA.numPools(), 1); + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; + ASSERT_EQ(managerA.numPools(), 2); ASSERT_EQ(managerA.testingDefaultRoot().getChildCount(), kSharedPoolCount); - ASSERT_EQ(managerB.numPools(), 1); + ASSERT_EQ(managerB.numPools(), 2); ASSERT_EQ(managerB.testingDefaultRoot().getChildCount(), kSharedPoolCount); auto child1 = managerA.addLeafPool("child_1"); @@ -260,41 +260,44 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) { kSharedPoolCount + 2, managerA.testingDefaultRoot().getChildCount()); EXPECT_EQ( kSharedPoolCount + 2, managerB.testingDefaultRoot().getChildCount()); - ASSERT_EQ(managerA.numPools(), 3); - ASSERT_EQ(managerB.numPools(), 3); - auto pool = managerB.addRootPool(); ASSERT_EQ(managerA.numPools(), 4); ASSERT_EQ(managerB.numPools(), 4); + auto pool = managerB.addRootPool(); + ASSERT_EQ(managerA.numPools(), 5); + ASSERT_EQ(managerB.numPools(), 5); ASSERT_EQ( managerA.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); ASSERT_EQ( managerB.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); child1.reset(); EXPECT_EQ( kSharedPoolCount + 1, managerA.testingDefaultRoot().getChildCount()); child2.reset(); EXPECT_EQ(kSharedPoolCount, managerB.testingDefaultRoot().getChildCount()); + ASSERT_EQ(managerA.numPools(), 3); + ASSERT_EQ(managerB.numPools(), 3); + pool.reset(); ASSERT_EQ(managerA.numPools(), 2); ASSERT_EQ(managerB.numPools(), 2); - pool.reset(); - ASSERT_EQ(managerA.numPools(), 1); - ASSERT_EQ(managerB.numPools(), 1); ASSERT_EQ( managerA.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); ASSERT_EQ( managerB.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); const std::string detailedManagerStr = managerA.toString(true); ASSERT_THAT( detailedManagerStr, testing::HasSubstr( - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n")); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n")); ASSERT_THAT( detailedManagerStr, testing::HasSubstr("__sys_spilling__ usage 0B reserved 0B peak 0B\n")); + ASSERT_THAT( + detailedManagerStr, + testing::HasSubstr("__sys_tracing__ usage 0B reserved 0B peak 0B\n")); for (int i = 0; i < 32; ++i) { ASSERT_THAT( managerA.toString(true), @@ -306,7 +309,7 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) { // TODO: remove this test when remove deprecatedAddDefaultLeafMemoryPool. TEST(MemoryHeaderTest, addDefaultLeafMemoryPool) { auto& manager = toMemoryManager(deprecatedDefaultMemoryManager()); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1; + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; ASSERT_EQ(manager.testingDefaultRoot().getChildCount(), kSharedPoolCount); { auto poolA = deprecatedAddDefaultLeafMemoryPool(); @@ -361,7 +364,7 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) { MemoryManagerOptions options; options.alignment = alignment; MemoryManager manager{options}; - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); const int numPools = 100; std::vector> userRootPools; std::vector> userLeafPools; @@ -386,14 +389,14 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) { ASSERT_FALSE(rootUnamedPool->name().empty()); ASSERT_EQ(rootUnamedPool->kind(), MemoryPool::Kind::kAggregate); ASSERT_EQ(rootUnamedPool->parent(), nullptr); - ASSERT_EQ(manager.numPools(), 1 + numPools + 2); + ASSERT_EQ(manager.numPools(), 1 + numPools + 2 + 1); userLeafPools.clear(); leafUnamedPool.reset(); - ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1); + ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1); userRootPools.clear(); - ASSERT_EQ(manager.numPools(), 1 + 1); + ASSERT_EQ(manager.numPools(), 1 + 2); rootUnamedPool.reset(); - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); } // TODO: when run sequentially, e.g. `buck run dwio/memory/...`, this has side @@ -410,7 +413,7 @@ TEST_F(MemoryManagerTest, globalMemoryManager) { ASSERT_NE(manager, globalManager); ASSERT_EQ(manager, memoryManager()); auto* managerII = memoryManager(); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1; + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; { auto& rootI = manager->testingDefaultRoot(); const std::string childIName("some_child"); @@ -444,9 +447,9 @@ TEST_F(MemoryManagerTest, globalMemoryManager) { ASSERT_EQ(userRootChild->kind(), MemoryPool::Kind::kAggregate); ASSERT_EQ(rootI.getChildCount(), kSharedPoolCount + 1); ASSERT_EQ(rootII.getChildCount(), kSharedPoolCount + 1); - ASSERT_EQ(manager->numPools(), 2 + 1); + ASSERT_EQ(manager->numPools(), 2 + 2); } - ASSERT_EQ(manager->numPools(), 1); + ASSERT_EQ(manager->numPools(), 2); } TEST_F(MemoryManagerTest, alignmentOptionCheck) { @@ -544,9 +547,9 @@ TEST_F(MemoryManagerTest, concurrentPoolAccess) { } stopCheck = true; checkThread.join(); - ASSERT_EQ(manager.numPools(), pools.size() + 1); + ASSERT_EQ(manager.numPools(), pools.size() + 2); pools.clear(); - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); } TEST_F(MemoryManagerTest, quotaEnforcement) { @@ -654,7 +657,7 @@ TEST_F(MemoryManagerTest, disableMemoryPoolTracking) { ASSERT_EQ(manager.capacity(), 64LL << 20); ASSERT_EQ(manager.shrinkPools(), 0); // Default 1 system pool with 1 leaf child - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); VELOX_ASSERT_THROW( leaf0->allocate(38LL << 20), "Exceeded memory pool capacity"); diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 78f51375fa45..bc1a24976a71 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -346,6 +346,16 @@ class QueryConfig { /// derived using micro-benchmarking. static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows"; + /// Enable query tracing flag. + static constexpr const char* kQueryTraceEnabled = "query_trace_enabled"; + + /// Base dir of a query to store tracing data. + static constexpr const char* kQueryTraceDir = "query_trace_dir"; + + /// A comma-separated list of plan node ids whose input data will be traced. + /// Empty string if only want to trace the query metadata. + static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids"; + uint64_t queryMaxMemoryPerNode() const { return config::toCapacity( get(kQueryMaxMemoryPerNode, "0B"), @@ -611,6 +621,21 @@ class QueryConfig { return get(kSpillableReservationGrowthPct, kDefaultPct); } + /// Returns true if query tracing is enabled. + bool queryTraceEnabled() const { + return get(kQueryTraceEnabled, false); + } + + std::string queryTraceDir() const { + // The default query trace dir, empty by default. + return get(kQueryTraceDir, ""); + } + + std::string queryTraceNodeIds() const { + // The default query trace nodes, empty by default. + return get(kQueryTraceNodeIds, ""); + } + bool prestoArrayAggIgnoreNulls() const { return get(kPrestoArrayAggIgnoreNulls, false); } diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index cd782b2d763d..0e4582e1b5c1 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -686,4 +686,29 @@ Spark-specific Configuration the value of this config can not exceed the default value. * - spark.partition_id - integer + - - The current task's Spark partition ID. It's set by the query engine (Spark) prior to task execution. + +Tracing +-------- +.. list-table:: + :widths: 30 10 10 70 + :header-rows: 1 + + * - Property Name + - Type + - Default Value + - Description + * - query_trace_enabled + - bool + - true + - If true, enable query tracing. + * - query_trace_dir + - string + - + - The root directory to store the tracing data and metadata for a query. + * - query_trace_node_ids + - string + - + - A comma-separated list of plan node ids whose input data will be trace. If it is empty, then we only trace the + query metadata which includes the query plan and configs etc. diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 38006f9baf53..3ff2e7321f46 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -27,11 +27,11 @@ #include "velox/exec/HashBuild.h" #include "velox/exec/LocalPlanner.h" #include "velox/exec/MemoryReclaimer.h" -#include "velox/exec/Merge.h" #include "velox/exec/NestedLoopJoinBuild.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/Task.h" +#include "velox/exec/trace/QueryTraceUtil.h" using facebook::velox::common::testutil::TestValue; @@ -293,6 +293,7 @@ Task::Task( planFragment_(std::move(planFragment)), destination_(destination), queryCtx_(std::move(queryCtx)), + traceConfig_(maybeMakeTraceConfig()), mode_(mode), consumerSupplier_(std::move(consumerSupplier)), onError_(std::move(onError)), @@ -304,6 +305,8 @@ Task::Task( VELOX_CHECK_NULL( dynamic_cast(queryCtx_->executor())); } + + maybeInitQueryTrace(); } Task::~Task() { @@ -2833,6 +2836,43 @@ std::shared_ptr Task::getExchangeClientLocked( return exchangeClients_[pipelineId]; } +std::optional Task::maybeMakeTraceConfig() const { + const auto& queryConfig = queryCtx_->queryConfig(); + if (!queryConfig.queryTraceEnabled()) { + return std::nullopt; + } + + VELOX_USER_CHECK( + !queryConfig.queryTraceDir().empty(), + "Query trace enabled but the trace dir is not set"); + + const auto queryTraceNodes = queryConfig.queryTraceNodeIds(); + if (queryTraceNodes.empty()) { + return trace::QueryTraceConfig(queryConfig.queryTraceDir()); + } + + std::vector nodes; + folly::split(',', queryTraceNodes, nodes); + std::unordered_set nodeSet(nodes.begin(), nodes.end()); + VELOX_CHECK_EQ(nodeSet.size(), nodes.size()); + LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes; + return trace::QueryTraceConfig( + std::move(nodeSet), queryConfig.queryTraceDir()); +} + +void Task::maybeInitQueryTrace() { + if (!traceConfig_) { + return; + } + + const auto traceTaskDir = + fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_); + trace::createTraceDirectory(traceTaskDir); + const auto queryMetadatWriter = std::make_unique( + traceTaskDir, memory::traceMemoryPool()); + queryMetadatWriter->write(queryCtx_, planFragment_.planNode); +} + void Task::testingVisitDrivers(const std::function& callback) { std::lock_guard l(mutex_); for (int i = 0; i < drivers_.size(); ++i) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 34b7c8c12936..a297ca9cb71c 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -24,6 +24,8 @@ #include "velox/exec/Split.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" +#include "velox/exec/trace/QueryMetadataWriter.h" +#include "velox/exec/trace/QueryTraceConfig.h" #include "velox/vector/ComplexVector.h" namespace facebook::velox::exec { @@ -970,6 +972,13 @@ class Task : public std::enable_shared_from_this { std::shared_ptr getExchangeClientLocked( int32_t pipelineId) const; + // Builds the query trace config. + std::optional maybeMakeTraceConfig() const; + + // Create a 'QueryMetadtaWriter' to trace the query metadata if the query + // trace enabled. + void maybeInitQueryTrace(); + // The helper class used to maintain 'numCreatedTasks_' and 'numDeletedTasks_' // on task construction and destruction. class TaskCounter { @@ -1000,6 +1009,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment_; const int destination_; const std::shared_ptr queryCtx_; + const std::optional traceConfig_; // The execution mode of the task. It is enforced that a task can only be // executed in a single mode throughout its lifetime diff --git a/velox/exec/trace/CMakeLists.txt b/velox/exec/trace/CMakeLists.txt index 532f3ed27f32..f8cc53e08ae4 100644 --- a/velox/exec/trace/CMakeLists.txt +++ b/velox/exec/trace/CMakeLists.txt @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_query_trace_exec QueryMetadataWriter.cpp QueryTraceConfig.cpp - QueryDataWriter.cpp) +velox_add_library( + velox_query_trace_exec + QueryMetadataWriter.cpp + QueryTraceConfig.cpp + QueryDataWriter.cpp + QueryTraceUtil.cpp) -target_link_libraries( +velox_link_libraries( velox_query_trace_exec PRIVATE velox_common_io @@ -26,10 +30,10 @@ target_link_libraries( velox_common_base velox_presto_serializer) -add_library(velox_query_trace_retrieve QueryDataReader.cpp - QueryMetadataReader.cpp) +velox_add_library(velox_query_trace_retrieve QueryDataReader.cpp + QueryMetadataReader.cpp) -target_link_libraries( +velox_link_libraries( velox_query_trace_retrieve velox_common_io velox_file diff --git a/velox/exec/trace/QueryDataReader.cpp b/velox/exec/trace/QueryDataReader.cpp index b234330dd0e5..f0175fe11b36 100644 --- a/velox/exec/trace/QueryDataReader.cpp +++ b/velox/exec/trace/QueryDataReader.cpp @@ -17,9 +17,6 @@ #include "velox/exec/trace/QueryDataReader.h" #include "velox/common/file/File.h" -#include "velox/connectors/hive/HiveDataSink.h" -#include "velox/connectors/hive/TableHandle.h" -#include "velox/exec/TableWriter.h" #include "velox/exec/trace/QueryTraceTraits.h" namespace facebook::velox::exec::trace { diff --git a/velox/exec/trace/QueryDataWriter.cpp b/velox/exec/trace/QueryDataWriter.cpp index 544e7a3d4580..57dc4284cd10 100644 --- a/velox/exec/trace/QueryDataWriter.cpp +++ b/velox/exec/trace/QueryDataWriter.cpp @@ -18,8 +18,6 @@ #include "velox/common/base/SpillStats.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" -#include "velox/exec/TreeOfLosers.h" -#include "velox/exec/UnorderedStreamReader.h" #include "velox/exec/trace/QueryTraceTraits.h" #include "velox/serializers/PrestoSerializer.h" diff --git a/velox/exec/trace/QueryMetadataReader.cpp b/velox/exec/trace/QueryMetadataReader.cpp index 8843aa49a2ab..99fe7cc8c435 100644 --- a/velox/exec/trace/QueryMetadataReader.cpp +++ b/velox/exec/trace/QueryMetadataReader.cpp @@ -18,11 +18,7 @@ #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" -#include "velox/connectors/hive/HiveDataSink.h" -#include "velox/connectors/hive/TableHandle.h" #include "velox/core/PlanNode.h" -#include "velox/exec/PartitionFunction.h" -#include "velox/exec/TableWriter.h" #include "velox/exec/trace/QueryTraceTraits.h" namespace facebook::velox::exec::trace { diff --git a/velox/exec/trace/QueryMetadataWriter.cpp b/velox/exec/trace/QueryMetadataWriter.cpp index beba2e809746..e14cdb197f44 100644 --- a/velox/exec/trace/QueryMetadataWriter.cpp +++ b/velox/exec/trace/QueryMetadataWriter.cpp @@ -17,11 +17,8 @@ #include "velox/exec/trace/QueryMetadataWriter.h" #include "velox/common/config/Config.h" #include "velox/common/file/File.h" -#include "velox/connectors/hive/HiveDataSink.h" -#include "velox/connectors/hive/TableHandle.h" #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" -#include "velox/exec/TableWriter.h" #include "velox/exec/trace/QueryTraceTraits.h" namespace facebook::velox::exec::trace { diff --git a/velox/exec/trace/QueryTraceConfig.cpp b/velox/exec/trace/QueryTraceConfig.cpp index 2437a632fb43..233226ebccf5 100644 --- a/velox/exec/trace/QueryTraceConfig.cpp +++ b/velox/exec/trace/QueryTraceConfig.cpp @@ -24,4 +24,9 @@ QueryTraceConfig::QueryTraceConfig( : queryNodes(std::move(_queryNodeIds)), queryTraceDir(std::move(_queryTraceDir)) {} +QueryTraceConfig::QueryTraceConfig(std::string _queryTraceDir) + : QueryTraceConfig( + std::unordered_set{}, + std::move(_queryTraceDir)) {} + } // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/QueryTraceConfig.h b/velox/exec/trace/QueryTraceConfig.h index 8afcbf22fb53..0e9c22818c34 100644 --- a/velox/exec/trace/QueryTraceConfig.h +++ b/velox/exec/trace/QueryTraceConfig.h @@ -21,15 +21,17 @@ namespace facebook::velox::exec::trace { struct QueryTraceConfig { - /// Target query trace nodes + /// Target query trace nodes. std::unordered_set queryNodes; - /// Base dir of query trace, normmaly it is $prefix/$taskId. + /// Base dir of query trace. std::string queryTraceDir; QueryTraceConfig( std::unordered_set _queryNodeIds, std::string _queryTraceDir); + QueryTraceConfig(std::string _queryTraceDir); + QueryTraceConfig() = default; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/QueryTraceUtil.cpp b/velox/exec/trace/QueryTraceUtil.cpp new file mode 100644 index 000000000000..437d3eee224e --- /dev/null +++ b/velox/exec/trace/QueryTraceUtil.cpp @@ -0,0 +1,38 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/trace/QueryTraceUtil.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/file/FileSystems.h" + +namespace facebook::velox::exec::trace { + +void createTraceDirectory(const std::string& traceDir) { + try { + const auto fs = filesystems::getFileSystem(traceDir, nullptr); + if (fs->exists(traceDir)) { + fs->rmdir(traceDir); + } + fs->mkdir(traceDir); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to create trace directory '{}' with error: {}", + traceDir, + e.what()); + } +} + +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/QueryTraceUtil.h b/velox/exec/trace/QueryTraceUtil.h new file mode 100644 index 000000000000..826aa35283a2 --- /dev/null +++ b/velox/exec/trace/QueryTraceUtil.h @@ -0,0 +1,26 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace facebook::velox::exec::trace { + +/// Creates a directory to store the query trace metdata and data. +void createTraceDirectory(const std::string& traceDir); + +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/test/CMakeLists.txt b/velox/exec/trace/test/CMakeLists.txt index 9c4ffe9dd358..861b5ffb339a 100644 --- a/velox/exec/trace/test/CMakeLists.txt +++ b/velox/exec/trace/test/CMakeLists.txt @@ -28,4 +28,11 @@ target_link_libraries( velox_memory velox_query_trace_exec velox_query_trace_retrieve - velox_vector_fuzzer) + velox_vector_fuzzer + GTest::gtest_main + GTest::gmock + Folly::folly + gflags::gflags + glog::glog + fmt::fmt + ${FILESYSTEM}) diff --git a/velox/exec/trace/test/QueryTraceTest.cpp b/velox/exec/trace/test/QueryTraceTest.cpp index 765eeccaaf53..8b969cc1b9b6 100644 --- a/velox/exec/trace/test/QueryTraceTest.cpp +++ b/velox/exec/trace/test/QueryTraceTest.cpp @@ -28,6 +28,7 @@ #include "velox/exec/trace/QueryDataWriter.h" #include "velox/exec/trace/QueryMetadataReader.h" #include "velox/exec/trace/QueryMetadataWriter.h" +#include "velox/exec/trace/QueryTraceUtil.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/tests/utils/VectorTestBase.h" @@ -202,4 +203,169 @@ TEST_F(QueryTracerTest, traceMetadata) { ASSERT_EQ(actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key)); } } + +TEST_F(QueryTracerTest, task) { + const auto rowType = + ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, + {BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()}); + std::vector rows; + constexpr auto numBatch = 1; + rows.reserve(numBatch); + for (auto i = 0; i < numBatch; ++i) { + rows.push_back(vectorFuzzer_.fuzzRow(rowType, 2)); + } + + auto planNodeIdGenerator = std::make_shared(); + const auto planNode = + PlanBuilder(planNodeIdGenerator) + .values(rows, false) + .project({"c0", "c1", "c2"}) + .hashJoin( + {"c0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator) + .values(rows, true) + .singleAggregation({"c0", "c1"}, {"min(c2)"}) + .project({"c0 AS u0", "c1 AS u1", "a0 AS u2"}) + .planNode(), + "c0 < 135", + {"c0", "c1", "c2"}, + core::JoinType::kInner) + .planNode(); + const auto expectedResult = + AssertQueryBuilder(planNode).maxDrivers(1).copyResults(pool()); + + for (const auto& queryTraceNodeIds : {"1,2", ""}) { + const auto outputDir = TempDirectoryPath::create(); + const auto expectedQueryConfigs = + std::unordered_map{ + {core::QueryConfig::kSpillEnabled, "true"}, + {core::QueryConfig::kSpillNumPartitionBits, "17"}, + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, outputDir->getPath()}, + {core::QueryConfig::kQueryTraceEnabled, queryTraceNodeIds}, + {"key1", "value1"}, + }; + const auto expectedConnectorProperties = + std::unordered_map>{ + {"test_trace", + std::make_shared( + std::unordered_map{ + {"cKey1", "cVal1"}})}}; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), + core::QueryConfig(expectedQueryConfigs), + expectedConnectorProperties); + + std::shared_ptr task; + const auto result = AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool(), task); + assertEqualResults({result}, {expectedResult}); + + const auto expectedDir = + fmt::format("{}/{}", outputDir->getPath(), task->taskId()); + const auto fs = filesystems::getFileSystem(expectedDir, nullptr); + const auto actaulDirs = fs->list(outputDir->getPath()); + ASSERT_EQ(actaulDirs.size(), 1); + ASSERT_EQ(actaulDirs.at(0), expectedDir); + + std::unordered_map acutalQueryConfigs; + std:: + unordered_map> + actualConnectorProperties; + core::PlanNodePtr actualQueryPlan; + auto reader = trace::QueryMetadataReader(expectedDir, pool()); + reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan); + + ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode)); + ASSERT_EQ(acutalQueryConfigs.size(), expectedQueryConfigs.size()); + for (const auto& [key, value] : acutalQueryConfigs) { + ASSERT_EQ(acutalQueryConfigs.at(key), expectedQueryConfigs.at(key)); + } + + ASSERT_EQ( + actualConnectorProperties.size(), expectedConnectorProperties.size()); + ASSERT_EQ(actualConnectorProperties.count("test_trace"), 1); + const auto expectedConnectorConfigs = + expectedConnectorProperties.at("test_trace")->rawConfigsCopy(); + const auto actualConnectorConfigs = + actualConnectorProperties.at("test_trace"); + for (const auto& [key, value] : actualConnectorConfigs) { + ASSERT_EQ( + actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key)); + } + } +} + +TEST_F(QueryTracerTest, error) { + const auto planNode = PlanBuilder().values({}).planNode(); + const auto expectedQueryConfigs = + std::unordered_map{ + {core::QueryConfig::kSpillEnabled, "true"}, + {core::QueryConfig::kSpillNumPartitionBits, "17"}, + {core::QueryConfig::kQueryTraceEnabled, "true"}, + }; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(expectedQueryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode).queryCtx(queryCtx).maxDrivers(1).copyResults( + pool()), + "Query trace enabled but the trace dir is not set"); +} + +TEST_F(QueryTracerTest, traceDir) { + const auto outputDir = TempDirectoryPath::create(); + const auto rootDir = outputDir->getPath(); + const auto fs = filesystems::getFileSystem(rootDir, nullptr); + auto dir1 = fmt::format("{}/{}", outputDir->getPath(), "t1"); + trace::createTraceDirectory(dir1); + ASSERT_TRUE(fs->exists(dir1)); + + auto dir2 = fmt::format("{}/{}", dir1, "t1_1"); + trace::createTraceDirectory(dir2); + ASSERT_TRUE(fs->exists(dir2)); + + // It will remove the old dir1 along with its subdir when created the dir1 + // again. + trace::createTraceDirectory(dir1); + ASSERT_TRUE(fs->exists(dir1)); + ASSERT_FALSE(fs->exists(dir2)); + + const auto parentDir = fmt::format("{}/{}", outputDir->getPath(), "p"); + fs->mkdir(parentDir); + + constexpr auto numThreads = 5; + std::vector queryThreads; + queryThreads.reserve(numThreads); + std::set expectedDirs; + for (int i = 0; i < numThreads; ++i) { + queryThreads.emplace_back([&, i]() { + const auto dir = fmt::format("{}/s{}", parentDir, i); + trace::createTraceDirectory(dir); + expectedDirs.insert(dir); + }); + } + + for (auto& queryThread : queryThreads) { + queryThread.join(); + } + + const auto actualDirs = fs->list(parentDir); + ASSERT_EQ(actualDirs.size(), numThreads); + ASSERT_EQ(actualDirs.size(), expectedDirs.size()); + for (const auto& dir : actualDirs) { + ASSERT_EQ(expectedDirs.count(dir), 1); + } +} } // namespace facebook::velox::exec::test + +// This main is needed for some tests on linux. +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + // Signal handler required for ThreadDebugInfoTest + facebook::velox::process::addDefaultFatalSignalHandler(); + folly::Init init(&argc, &argv, false); + return RUN_ALL_TESTS(); +}