Skip to content

Commit

Permalink
Add TraceHistory (facebookincubator#8603)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#8603

The new class is created for tracing the history.  Last 16 values are
kept per thread.  Add the tracing to each `TraceContext` constructor, and also
lower level important locations in the reader, e.g. seeking on the column reader
and subfield reading.

Reviewed By: oerling

Differential Revision: D53198236

fbshipit-source-id: 042cdf5b2ba384cf9afd94ed5ee2167009c48a20
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 1, 2024
1 parent 6399f17 commit e0cf6e9
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 4 deletions.
2 changes: 1 addition & 1 deletion velox/common/process/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

add_library(velox_process ProcessBase.cpp StackTrace.cpp ThreadDebugInfo.cpp
TraceContext.cpp)
TraceContext.cpp TraceHistory.cpp)

target_link_libraries(
velox_process
Expand Down
7 changes: 7 additions & 0 deletions velox/common/process/TraceContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/common/process/TraceContext.h"

#include "velox/common/process/ThreadLocalRegistry.h"
#include "velox/common/process/TraceHistory.h"

#include <sstream>

Expand All @@ -36,6 +37,12 @@ TraceContext::TraceContext(std::string label, bool isTemporary)
: label_(std::move(label)),
enterTime_(std::chrono::steady_clock::now()),
isTemporary_(isTemporary) {
TraceHistory::push([&](auto& entry) {
entry.time = enterTime_;
entry.file = __FILE__;
entry.line = __LINE__;
snprintf(entry.label, entry.kLabelCapacity, "%s", label_.c_str());
});
threadLocalTraceData.withValue([&](auto& counts) {
auto& data = counts[label_];
++data.numThreads;
Expand Down
56 changes: 56 additions & 0 deletions velox/common/process/TraceHistory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/process/TraceHistory.h"

#include <folly/system/ThreadId.h>

#include <algorithm>

namespace facebook::velox::process {

namespace {
auto registry = std::make_shared<ThreadLocalRegistry<TraceHistory>>();
}

namespace detail {
thread_local ThreadLocalRegistry<TraceHistory>::Reference traceHistory(
registry);
}

TraceHistory::TraceHistory()
: threadId_(std::this_thread::get_id()), osTid_(folly::getOSThreadID()) {}

std::vector<TraceHistory::EntriesWithThreadInfo> TraceHistory::listAll() {
std::vector<EntriesWithThreadInfo> results;
registry->forAllValues([&](auto& history) {
EntriesWithThreadInfo result;
result.threadId = history.threadId_;
result.osTid = history.osTid_;
for (int i = 0; i < kCapacity; ++i) {
const int j = (history.index_ + kCapacity - 1 - i) % kCapacity;
if (!populated(history.data_[j])) {
break;
}
result.entries.push_back(history.data_[j]);
}
std::reverse(result.entries.begin(), result.entries.end());
results.push_back(std::move(result));
});
return results;
}

} // namespace facebook::velox::process
105 changes: 105 additions & 0 deletions velox/common/process/TraceHistory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/common/process/ThreadLocalRegistry.h"

#include <cassert>
#include <chrono>
#include <cstdio>
#include <thread>
#include <vector>

/// Push an entry to the history ring buffer with a label from format string
/// (same as printf) and optional arguments.
#define VELOX_TRACE_HISTORY_PUSH(_format, ...) \
::facebook::velox::process::TraceHistory::push([&](auto& entry) { \
entry.time = ::std::chrono::steady_clock::now(); \
entry.file = __FILE__; \
entry.line = __LINE__; \
::snprintf(entry.label, entry.kLabelCapacity, _format, ##__VA_ARGS__); \
})

namespace facebook::velox::process {

class TraceHistory;

namespace detail {
extern thread_local ThreadLocalRegistry<TraceHistory>::Reference traceHistory;
}

/// Keep list of labels in a ring buffer that is fixed sized and thread local.
class TraceHistory {
public:
TraceHistory();

/// An entry with tracing information and custom label.
struct Entry {
std::chrono::steady_clock::time_point time;
const char* file;
int32_t line;

static constexpr int kLabelCapacity =
64 - sizeof(time) - sizeof(file) - sizeof(line);
char label[kLabelCapacity];
};

/// NOTE: usually VELOX_TRACE_HISTORY_PUSH should be used instead of calling
/// this function directly.
///
/// Add a new entry to the thread local instance. If there are more than
/// `kCapacity' entries, overwrite the oldest ones. All the mutation on the
/// new entry should be done in the functor `init'.
template <typename F>
static void push(F&& init) {
detail::traceHistory.withValue(
[init = std::forward<F>(init)](auto& history) {
auto& entry = history.data_[history.index_];
init(entry);
assert(populated(entry));
history.index_ = (history.index_ + 1) % kCapacity;
});
}

/// All entries in a specific thread.
struct EntriesWithThreadInfo {
std::thread::id threadId;
uint64_t osTid;
std::vector<Entry> entries;
};

/// List all entries from all threads.
static std::vector<EntriesWithThreadInfo> listAll();

/// Keep the last `kCapacity' entries per thread. Must be a power of 2.
static constexpr int kCapacity = 16;

private:
static_assert((kCapacity & (kCapacity - 1)) == 0);
static_assert(sizeof(Entry) == 64);

static bool populated(const Entry& entry) {
return entry.file != nullptr;
}

alignas(64) Entry data_[kCapacity]{};
const std::thread::id threadId_;
const uint64_t osTid_;
int index_ = 0;
};

} // namespace facebook::velox::process
5 changes: 3 additions & 2 deletions velox/common/process/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_process_test TraceContextTest.cpp
ThreadLocalRegistryTest.cpp)
add_executable(
velox_process_test TraceContextTest.cpp ThreadLocalRegistryTest.cpp
TraceHistoryTest.cpp)

add_test(velox_process_test velox_process_test)

Expand Down
16 changes: 16 additions & 0 deletions velox/common/process/tests/TraceContextTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/common/process/TraceContext.h"
#include "velox/common/process/TraceHistory.h"

#include <fmt/format.h>
#include <folly/synchronization/Baton.h>
Expand Down Expand Up @@ -79,5 +80,20 @@ TEST_F(TraceContextTest, basic) {
}
}

TEST_F(TraceContextTest, traceHistory) {
std::thread([] {
TraceContext trace("test");
TraceContext trace2(
std::string(TraceHistory::Entry::kLabelCapacity + 10, 'x'));
auto results = TraceHistory::listAll();
ASSERT_EQ(results.size(), 1);
ASSERT_EQ(results[0].entries.size(), 2);
ASSERT_STREQ(results[0].entries[0].label, "test");
ASSERT_EQ(
results[0].entries[1].label,
std::string(TraceHistory::Entry::kLabelCapacity - 1, 'x'));
}).join();
}

} // namespace
} // namespace facebook::velox::process
127 changes: 127 additions & 0 deletions velox/common/process/tests/TraceHistoryTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/process/TraceHistory.h"

#include <fmt/format.h>
#include <folly/synchronization/Baton.h>
#include <folly/synchronization/Latch.h>
#include <gtest/gtest.h>

namespace facebook::velox::process {
namespace {

class TraceHistoryTest : public testing::Test {
public:
void SetUp() override {
ASSERT_TRUE(TraceHistory::listAll().empty());
}

void TearDown() override {
ASSERT_TRUE(TraceHistory::listAll().empty());
}
};

TEST_F(TraceHistoryTest, basic) {
std::thread([] {
auto timeLow = std::chrono::steady_clock::now();
constexpr int kStartLine = __LINE__;
for (int i = 0; i < TraceHistory::kCapacity + 10; ++i) {
VELOX_TRACE_HISTORY_PUSH("Test %d", i);
}
auto timeHigh = std::chrono::steady_clock::now();
auto results = TraceHistory::listAll();
ASSERT_EQ(results.size(), 1);
ASSERT_EQ(results[0].threadId, std::this_thread::get_id());
ASSERT_EQ(results[0].osTid, folly::getOSThreadID());
ASSERT_EQ(results[0].entries.size(), TraceHistory::kCapacity);
auto lastTime = timeLow;
for (int i = 0; i < TraceHistory::kCapacity; ++i) {
auto& entry = results[0].entries[i];
ASSERT_EQ(entry.line, kStartLine + 2);
ASSERT_STREQ(
entry.file + strlen(entry.file) - 20, "TraceHistoryTest.cpp");
ASSERT_LE(lastTime, entry.time);
lastTime = entry.time;
ASSERT_EQ(strncmp(entry.label, "Test ", 5), 0);
ASSERT_EQ(atoi(entry.label + 5), i + 10);
}
ASSERT_LE(lastTime, timeHigh);
}).join();
}

TEST_F(TraceHistoryTest, multiThread) {
constexpr int kNumThreads = 3;
folly::Latch latch(kNumThreads);
folly::Baton<> batons[kNumThreads];
std::vector<std::thread> threads;
auto timeLow = std::chrono::steady_clock::now();
constexpr int kStartLine = __LINE__;
for (int i = 0; i < kNumThreads; ++i) {
threads.emplace_back([&, i] {
VELOX_TRACE_HISTORY_PUSH("Test");
VELOX_TRACE_HISTORY_PUSH("Test %d", i);
latch.count_down();
batons[i].wait();
});
}
latch.wait();
auto timeHigh = std::chrono::steady_clock::now();
auto results = TraceHistory::listAll();
ASSERT_EQ(results.size(), kNumThreads);
for (auto& result : results) {
auto threadIndex =
std::find_if(
threads.begin(),
threads.end(),
[&](auto& t) { return t.get_id() == result.threadId; }) -
threads.begin();
ASSERT_EQ(result.entries.size(), 2);
ASSERT_EQ(result.entries[0].line, kStartLine + 3);
ASSERT_EQ(result.entries[1].line, kStartLine + 4);
ASSERT_STREQ(result.entries[0].label, "Test");
ASSERT_EQ(result.entries[1].label, fmt::format("Test {}", threadIndex));
for (auto& entry : result.entries) {
ASSERT_LE(timeLow, entry.time);
ASSERT_LE(entry.time, timeHigh);
ASSERT_TRUE(entry.file);
ASSERT_STREQ(
entry.file + strlen(entry.file) - 20, "TraceHistoryTest.cpp");
}
}
for (int i = 0; i < kNumThreads; ++i) {
ASSERT_EQ(TraceHistory::listAll().size(), kNumThreads - i);
batons[i].post();
threads[i].join();
}
}

TEST_F(TraceHistoryTest, largeLabel) {
std::thread([] {
VELOX_TRACE_HISTORY_PUSH(
"%s",
std::string(TraceHistory::Entry::kLabelCapacity + 10, 'x').c_str());
auto results = TraceHistory::listAll();
ASSERT_EQ(results.size(), 1);
ASSERT_EQ(results[0].entries.size(), 1);
ASSERT_EQ(
results[0].entries[0].label,
std::string(TraceHistory::Entry::kLabelCapacity - 1, 'x'));
}).join();
}

} // namespace
} // namespace facebook::velox::process
1 change: 1 addition & 0 deletions velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const std::vector<SelectiveColumnReader*>& SelectiveColumnReader::children()
}

void SelectiveColumnReader::seekTo(vector_size_t offset, bool readsNullsOnly) {
VELOX_TRACE_HISTORY_PUSH("seekTo %d %d", offset, readsNullsOnly);
if (offset == readOffset_) {
return;
}
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/common/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/common/base/RawVector.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/process/ProcessBase.h"
#include "velox/common/process/TraceHistory.h"
#include "velox/dwio/common/ColumnSelector.h"
#include "velox/dwio/common/FormatData.h"
#include "velox/dwio/common/IntDecoder.h"
Expand Down Expand Up @@ -189,7 +190,8 @@ class SelectiveColumnReader {
// group. Interpretation of 'index' depends on format. Clears counts
// of skipped enclosing struct nulls for formats where nulls are
// recorded at each nesting level, i.e. not rep-def.
virtual void seekToRowGroup(uint32_t /*index*/) {
virtual void seekToRowGroup(uint32_t index) {
VELOX_TRACE_HISTORY_PUSH("seekToRowGroup %u", index);
numParentNulls_ = 0;
parentNullsRecordedTo_ = 0;
}
Expand Down
Loading

0 comments on commit e0cf6e9

Please sign in to comment.