From e0cf6e90687ac727c5a76b78d0727bfd1840b2d8 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Thu, 1 Feb 2024 15:50:41 -0800 Subject: [PATCH] Add TraceHistory (#8603) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/8603 The new class is created for tracing the history. Last 16 values are kept per thread. Add the tracing to each `TraceContext` constructor, and also lower level important locations in the reader, e.g. seeking on the column reader and subfield reading. Reviewed By: oerling Differential Revision: D53198236 fbshipit-source-id: 042cdf5b2ba384cf9afd94ed5ee2167009c48a20 --- velox/common/process/CMakeLists.txt | 2 +- velox/common/process/TraceContext.cpp | 7 + velox/common/process/TraceHistory.cpp | 56 ++++++++ velox/common/process/TraceHistory.h | 105 +++++++++++++++ velox/common/process/tests/CMakeLists.txt | 5 +- .../common/process/tests/TraceContextTest.cpp | 16 +++ .../common/process/tests/TraceHistoryTest.cpp | 127 ++++++++++++++++++ velox/dwio/common/SelectiveColumnReader.cpp | 1 + velox/dwio/common/SelectiveColumnReader.h | 4 +- .../common/SelectiveStructColumnReader.cpp | 2 + 10 files changed, 321 insertions(+), 4 deletions(-) create mode 100644 velox/common/process/TraceHistory.cpp create mode 100644 velox/common/process/TraceHistory.h create mode 100644 velox/common/process/tests/TraceHistoryTest.cpp diff --git a/velox/common/process/CMakeLists.txt b/velox/common/process/CMakeLists.txt index 22182ed58f12..af0bedd5ce4f 100644 --- a/velox/common/process/CMakeLists.txt +++ b/velox/common/process/CMakeLists.txt @@ -13,7 +13,7 @@ # limitations under the License. add_library(velox_process ProcessBase.cpp StackTrace.cpp ThreadDebugInfo.cpp - TraceContext.cpp) + TraceContext.cpp TraceHistory.cpp) target_link_libraries( velox_process diff --git a/velox/common/process/TraceContext.cpp b/velox/common/process/TraceContext.cpp index 78c8406d5f81..baad284a993c 100644 --- a/velox/common/process/TraceContext.cpp +++ b/velox/common/process/TraceContext.cpp @@ -17,6 +17,7 @@ #include "velox/common/process/TraceContext.h" #include "velox/common/process/ThreadLocalRegistry.h" +#include "velox/common/process/TraceHistory.h" #include @@ -36,6 +37,12 @@ TraceContext::TraceContext(std::string label, bool isTemporary) : label_(std::move(label)), enterTime_(std::chrono::steady_clock::now()), isTemporary_(isTemporary) { + TraceHistory::push([&](auto& entry) { + entry.time = enterTime_; + entry.file = __FILE__; + entry.line = __LINE__; + snprintf(entry.label, entry.kLabelCapacity, "%s", label_.c_str()); + }); threadLocalTraceData.withValue([&](auto& counts) { auto& data = counts[label_]; ++data.numThreads; diff --git a/velox/common/process/TraceHistory.cpp b/velox/common/process/TraceHistory.cpp new file mode 100644 index 000000000000..bf7524590802 --- /dev/null +++ b/velox/common/process/TraceHistory.cpp @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/process/TraceHistory.h" + +#include + +#include + +namespace facebook::velox::process { + +namespace { +auto registry = std::make_shared>(); +} + +namespace detail { +thread_local ThreadLocalRegistry::Reference traceHistory( + registry); +} + +TraceHistory::TraceHistory() + : threadId_(std::this_thread::get_id()), osTid_(folly::getOSThreadID()) {} + +std::vector TraceHistory::listAll() { + std::vector results; + registry->forAllValues([&](auto& history) { + EntriesWithThreadInfo result; + result.threadId = history.threadId_; + result.osTid = history.osTid_; + for (int i = 0; i < kCapacity; ++i) { + const int j = (history.index_ + kCapacity - 1 - i) % kCapacity; + if (!populated(history.data_[j])) { + break; + } + result.entries.push_back(history.data_[j]); + } + std::reverse(result.entries.begin(), result.entries.end()); + results.push_back(std::move(result)); + }); + return results; +} + +} // namespace facebook::velox::process diff --git a/velox/common/process/TraceHistory.h b/velox/common/process/TraceHistory.h new file mode 100644 index 000000000000..bcee2cec69d7 --- /dev/null +++ b/velox/common/process/TraceHistory.h @@ -0,0 +1,105 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/process/ThreadLocalRegistry.h" + +#include +#include +#include +#include +#include + +/// Push an entry to the history ring buffer with a label from format string +/// (same as printf) and optional arguments. +#define VELOX_TRACE_HISTORY_PUSH(_format, ...) \ + ::facebook::velox::process::TraceHistory::push([&](auto& entry) { \ + entry.time = ::std::chrono::steady_clock::now(); \ + entry.file = __FILE__; \ + entry.line = __LINE__; \ + ::snprintf(entry.label, entry.kLabelCapacity, _format, ##__VA_ARGS__); \ + }) + +namespace facebook::velox::process { + +class TraceHistory; + +namespace detail { +extern thread_local ThreadLocalRegistry::Reference traceHistory; +} + +/// Keep list of labels in a ring buffer that is fixed sized and thread local. +class TraceHistory { + public: + TraceHistory(); + + /// An entry with tracing information and custom label. + struct Entry { + std::chrono::steady_clock::time_point time; + const char* file; + int32_t line; + + static constexpr int kLabelCapacity = + 64 - sizeof(time) - sizeof(file) - sizeof(line); + char label[kLabelCapacity]; + }; + + /// NOTE: usually VELOX_TRACE_HISTORY_PUSH should be used instead of calling + /// this function directly. + /// + /// Add a new entry to the thread local instance. If there are more than + /// `kCapacity' entries, overwrite the oldest ones. All the mutation on the + /// new entry should be done in the functor `init'. + template + static void push(F&& init) { + detail::traceHistory.withValue( + [init = std::forward(init)](auto& history) { + auto& entry = history.data_[history.index_]; + init(entry); + assert(populated(entry)); + history.index_ = (history.index_ + 1) % kCapacity; + }); + } + + /// All entries in a specific thread. + struct EntriesWithThreadInfo { + std::thread::id threadId; + uint64_t osTid; + std::vector entries; + }; + + /// List all entries from all threads. + static std::vector listAll(); + + /// Keep the last `kCapacity' entries per thread. Must be a power of 2. + static constexpr int kCapacity = 16; + + private: + static_assert((kCapacity & (kCapacity - 1)) == 0); + static_assert(sizeof(Entry) == 64); + + static bool populated(const Entry& entry) { + return entry.file != nullptr; + } + + alignas(64) Entry data_[kCapacity]{}; + const std::thread::id threadId_; + const uint64_t osTid_; + int index_ = 0; +}; + +} // namespace facebook::velox::process diff --git a/velox/common/process/tests/CMakeLists.txt b/velox/common/process/tests/CMakeLists.txt index 836e397466a2..2fce354e31ec 100644 --- a/velox/common/process/tests/CMakeLists.txt +++ b/velox/common/process/tests/CMakeLists.txt @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_process_test TraceContextTest.cpp - ThreadLocalRegistryTest.cpp) +add_executable( + velox_process_test TraceContextTest.cpp ThreadLocalRegistryTest.cpp + TraceHistoryTest.cpp) add_test(velox_process_test velox_process_test) diff --git a/velox/common/process/tests/TraceContextTest.cpp b/velox/common/process/tests/TraceContextTest.cpp index 9e958545a32f..77dd46c1c389 100644 --- a/velox/common/process/tests/TraceContextTest.cpp +++ b/velox/common/process/tests/TraceContextTest.cpp @@ -15,6 +15,7 @@ */ #include "velox/common/process/TraceContext.h" +#include "velox/common/process/TraceHistory.h" #include #include @@ -79,5 +80,20 @@ TEST_F(TraceContextTest, basic) { } } +TEST_F(TraceContextTest, traceHistory) { + std::thread([] { + TraceContext trace("test"); + TraceContext trace2( + std::string(TraceHistory::Entry::kLabelCapacity + 10, 'x')); + auto results = TraceHistory::listAll(); + ASSERT_EQ(results.size(), 1); + ASSERT_EQ(results[0].entries.size(), 2); + ASSERT_STREQ(results[0].entries[0].label, "test"); + ASSERT_EQ( + results[0].entries[1].label, + std::string(TraceHistory::Entry::kLabelCapacity - 1, 'x')); + }).join(); +} + } // namespace } // namespace facebook::velox::process diff --git a/velox/common/process/tests/TraceHistoryTest.cpp b/velox/common/process/tests/TraceHistoryTest.cpp new file mode 100644 index 000000000000..754fe6f389c3 --- /dev/null +++ b/velox/common/process/tests/TraceHistoryTest.cpp @@ -0,0 +1,127 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/process/TraceHistory.h" + +#include +#include +#include +#include + +namespace facebook::velox::process { +namespace { + +class TraceHistoryTest : public testing::Test { + public: + void SetUp() override { + ASSERT_TRUE(TraceHistory::listAll().empty()); + } + + void TearDown() override { + ASSERT_TRUE(TraceHistory::listAll().empty()); + } +}; + +TEST_F(TraceHistoryTest, basic) { + std::thread([] { + auto timeLow = std::chrono::steady_clock::now(); + constexpr int kStartLine = __LINE__; + for (int i = 0; i < TraceHistory::kCapacity + 10; ++i) { + VELOX_TRACE_HISTORY_PUSH("Test %d", i); + } + auto timeHigh = std::chrono::steady_clock::now(); + auto results = TraceHistory::listAll(); + ASSERT_EQ(results.size(), 1); + ASSERT_EQ(results[0].threadId, std::this_thread::get_id()); + ASSERT_EQ(results[0].osTid, folly::getOSThreadID()); + ASSERT_EQ(results[0].entries.size(), TraceHistory::kCapacity); + auto lastTime = timeLow; + for (int i = 0; i < TraceHistory::kCapacity; ++i) { + auto& entry = results[0].entries[i]; + ASSERT_EQ(entry.line, kStartLine + 2); + ASSERT_STREQ( + entry.file + strlen(entry.file) - 20, "TraceHistoryTest.cpp"); + ASSERT_LE(lastTime, entry.time); + lastTime = entry.time; + ASSERT_EQ(strncmp(entry.label, "Test ", 5), 0); + ASSERT_EQ(atoi(entry.label + 5), i + 10); + } + ASSERT_LE(lastTime, timeHigh); + }).join(); +} + +TEST_F(TraceHistoryTest, multiThread) { + constexpr int kNumThreads = 3; + folly::Latch latch(kNumThreads); + folly::Baton<> batons[kNumThreads]; + std::vector threads; + auto timeLow = std::chrono::steady_clock::now(); + constexpr int kStartLine = __LINE__; + for (int i = 0; i < kNumThreads; ++i) { + threads.emplace_back([&, i] { + VELOX_TRACE_HISTORY_PUSH("Test"); + VELOX_TRACE_HISTORY_PUSH("Test %d", i); + latch.count_down(); + batons[i].wait(); + }); + } + latch.wait(); + auto timeHigh = std::chrono::steady_clock::now(); + auto results = TraceHistory::listAll(); + ASSERT_EQ(results.size(), kNumThreads); + for (auto& result : results) { + auto threadIndex = + std::find_if( + threads.begin(), + threads.end(), + [&](auto& t) { return t.get_id() == result.threadId; }) - + threads.begin(); + ASSERT_EQ(result.entries.size(), 2); + ASSERT_EQ(result.entries[0].line, kStartLine + 3); + ASSERT_EQ(result.entries[1].line, kStartLine + 4); + ASSERT_STREQ(result.entries[0].label, "Test"); + ASSERT_EQ(result.entries[1].label, fmt::format("Test {}", threadIndex)); + for (auto& entry : result.entries) { + ASSERT_LE(timeLow, entry.time); + ASSERT_LE(entry.time, timeHigh); + ASSERT_TRUE(entry.file); + ASSERT_STREQ( + entry.file + strlen(entry.file) - 20, "TraceHistoryTest.cpp"); + } + } + for (int i = 0; i < kNumThreads; ++i) { + ASSERT_EQ(TraceHistory::listAll().size(), kNumThreads - i); + batons[i].post(); + threads[i].join(); + } +} + +TEST_F(TraceHistoryTest, largeLabel) { + std::thread([] { + VELOX_TRACE_HISTORY_PUSH( + "%s", + std::string(TraceHistory::Entry::kLabelCapacity + 10, 'x').c_str()); + auto results = TraceHistory::listAll(); + ASSERT_EQ(results.size(), 1); + ASSERT_EQ(results[0].entries.size(), 1); + ASSERT_EQ( + results[0].entries[0].label, + std::string(TraceHistory::Entry::kLabelCapacity - 1, 'x')); + }).join(); +} + +} // namespace +} // namespace facebook::velox::process diff --git a/velox/dwio/common/SelectiveColumnReader.cpp b/velox/dwio/common/SelectiveColumnReader.cpp index f2c157ff9c7e..25aff8eb42c3 100644 --- a/velox/dwio/common/SelectiveColumnReader.cpp +++ b/velox/dwio/common/SelectiveColumnReader.cpp @@ -66,6 +66,7 @@ const std::vector& SelectiveColumnReader::children() } void SelectiveColumnReader::seekTo(vector_size_t offset, bool readsNullsOnly) { + VELOX_TRACE_HISTORY_PUSH("seekTo %d %d", offset, readsNullsOnly); if (offset == readOffset_) { return; } diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index 08740a139914..4b26b0d4d60c 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -18,6 +18,7 @@ #include "velox/common/base/RawVector.h" #include "velox/common/memory/Memory.h" #include "velox/common/process/ProcessBase.h" +#include "velox/common/process/TraceHistory.h" #include "velox/dwio/common/ColumnSelector.h" #include "velox/dwio/common/FormatData.h" #include "velox/dwio/common/IntDecoder.h" @@ -189,7 +190,8 @@ class SelectiveColumnReader { // group. Interpretation of 'index' depends on format. Clears counts // of skipped enclosing struct nulls for formats where nulls are // recorded at each nesting level, i.e. not rep-def. - virtual void seekToRowGroup(uint32_t /*index*/) { + virtual void seekToRowGroup(uint32_t index) { + VELOX_TRACE_HISTORY_PUSH("seekToRowGroup %u", index); numParentNulls_ = 0; parentNullsRecordedTo_ = 0; } diff --git a/velox/dwio/common/SelectiveStructColumnReader.cpp b/velox/dwio/common/SelectiveStructColumnReader.cpp index 71ad3c1c155f..1f07f73351e3 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.cpp +++ b/velox/dwio/common/SelectiveStructColumnReader.cpp @@ -138,6 +138,7 @@ void SelectiveStructColumnReaderBase::read( VELOX_CHECK(!childSpecs.empty()); for (size_t i = 0; i < childSpecs.size(); ++i) { auto& childSpec = childSpecs[i]; + VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str()); if (isChildConstant(*childSpec)) { continue; } @@ -341,6 +342,7 @@ void SelectiveStructColumnReaderBase::getValues( } bool lazyPrepared = false; for (auto& childSpec : scanSpec_->children()) { + VELOX_TRACE_HISTORY_PUSH("getValues %s", childSpec->fieldName().c_str()); if (!childSpec->projectOut()) { continue; }