Skip to content

Commit

Permalink
tprobes: ftrace: track last event timestamp per data source
Browse files Browse the repository at this point in the history
The current implementation can be pessimistic if there are multiple
concurrent data sources, one of which is only interested in sparse
events (imagine a print filter and one matching event every minute,
while the buffers are read - advancing the last read timestamp -
multiple times per second).

So instead, track the timestamp for each {data_source, cpu} combination
separately. I've introduced a new proto field id to ease manual
debugging should there be further issues introduced by this change.
However for now, the processing will treat the two fields
interchangeably.

Bug: 344969928
Change-Id: Ia95e81053f4b26c3244ebab49e083d09ca757f88
  • Loading branch information
rsavitski committed Aug 19, 2024
1 parent a200086 commit 6f9673a
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 72 deletions.
17 changes: 12 additions & 5 deletions protos/perfetto/trace/ftrace/ftrace_event_bundle.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,22 @@ message FtraceEventBundle {
}
repeated FtraceError error = 8;

// The timestamp (ftrace clock) of the last event consumed from this per-cpu
// kernel buffer prior to starting this bundle. In other words: the last
// event in the previous bundle.
// Superseded by |previous_bundle_end_timestamp| in perfetto v47+. The
// primary difference is that this field tracked the last timestamp read from
// the per-cpu buffer, while the newer field tracks events that get
// serialised into the trace.
// Added in: perfetto v44.
optional uint64 last_read_event_timestamp = 9;

// The timestamp (using ftrace clock) of the last event written into this
// data source on this cpu. In other words: the last event in the previous
// bundle.
// Lets the trace processing find an initial timestamp after which ftrace
// data is known to be valid across all cpus. Of particular importance when
// the perfetto trace buffer is a ring buffer as well, as the overwriting of
// oldest bundles can skew the first valid timestamp per cpu significantly.
// Added in: perfetto v44.
optional uint64 last_read_event_timestamp = 9;
// Added in: perfetto v47.
optional uint64 previous_bundle_end_timestamp = 10;
}

enum FtraceClock {
Expand Down
17 changes: 12 additions & 5 deletions protos/perfetto/trace/perfetto_trace.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11316,15 +11316,22 @@ message FtraceEventBundle {
}
repeated FtraceError error = 8;

// The timestamp (ftrace clock) of the last event consumed from this per-cpu
// kernel buffer prior to starting this bundle. In other words: the last
// event in the previous bundle.
// Superseded by |previous_bundle_end_timestamp| in perfetto v47+. The
// primary difference is that this field tracked the last timestamp read from
// the per-cpu buffer, while the newer field tracks events that get
// serialised into the trace.
// Added in: perfetto v44.
optional uint64 last_read_event_timestamp = 9;

// The timestamp (using ftrace clock) of the last event written into this
// data source on this cpu. In other words: the last event in the previous
// bundle.
// Lets the trace processing find an initial timestamp after which ftrace
// data is known to be valid across all cpus. Of particular importance when
// the perfetto trace buffer is a ring buffer as well, as the overwriting of
// oldest bundles can skew the first valid timestamp per cpu significantly.
// Added in: perfetto v44.
optional uint64 last_read_event_timestamp = 9;
// Added in: perfetto v47.
optional uint64 previous_bundle_end_timestamp = 10;
}

enum FtraceClock {
Expand Down
64 changes: 29 additions & 35 deletions src/traced/probes/ftrace/cpu_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,27 +292,24 @@ size_t CpuReader::ReadAndProcessBatch(
}
} // end of metatrace::FTRACE_CPU_READ_BATCH

// Parse the pages and write to the trace for all relevant data
// sources.
// Parse the pages and write to the trace for all relevant data sources.
if (pages_read == 0)
return pages_read;

uint64_t last_read_ts = last_read_event_ts_;
for (FtraceDataSource* data_source : started_data_sources) {
last_read_ts = last_read_event_ts_;
ProcessPagesForDataSource(
data_source->trace_writer(), data_source->mutable_metadata(), cpu_,
data_source->parsing_config(), data_source->mutable_parse_errors(),
&last_read_ts, parsing_buf, pages_read, compact_sched_buf, table_,
symbolizer_, ftrace_clock_snapshot_, ftrace_clock_);
data_source->mutable_bundle_end_timestamp(cpu_), parsing_buf,
pages_read, compact_sched_buf, table_, symbolizer_,
ftrace_clock_snapshot_, ftrace_clock_);
}
last_read_event_ts_ = last_read_ts;

return pages_read;
}

void CpuReader::Bundler::StartNewPacket(bool lost_events,
uint64_t last_read_event_timestamp) {
void CpuReader::Bundler::StartNewPacket(
bool lost_events,
uint64_t previous_bundle_end_timestamp) {
FinalizeAndRunSymbolizer();
packet_ = trace_writer_->NewTracePacket();
bundle_ = packet_->set_ftrace_events();
Expand All @@ -325,7 +322,7 @@ void CpuReader::Bundler::StartNewPacket(bool lost_events,
// note: set-to-zero is valid and expected for the first bundle per cpu
// (outside of concurrent tracing), with the effective meaning of "all data is
// valid since the data source was started".
bundle_->set_last_read_event_timestamp(last_read_event_timestamp);
bundle_->set_previous_bundle_end_timestamp(previous_bundle_end_timestamp);

if (ftrace_clock_) {
bundle_->set_ftrace_clock(ftrace_clock_);
Expand Down Expand Up @@ -410,21 +407,14 @@ void CpuReader::Bundler::FinalizeAndRunSymbolizer() {
// event bundle proto with a timestamp, letting the trace processor decide
// whether to discard or keep the post-error data. Previously, we crashed as
// soon as we encountered such an error.
// TODO(rsavitski, b/192586066): consider moving last_read_event_ts tracking to
// be per-datasource. The current implementation can be pessimistic if there are
// multiple concurrent data sources, one of which is only interested in sparse
// events (imagine a print filter and one matching event every minute, while the
// buffers are read - advancing the last read timestamp - multiple times per
// second). Tracking the timestamp of the last event *written into the
// datasource* can be more accurate.
// static
bool CpuReader::ProcessPagesForDataSource(
TraceWriter* trace_writer,
FtraceMetadata* metadata,
size_t cpu,
const FtraceDataSourceConfig* ds_config,
base::FlatSet<protos::pbzero::FtraceParseStatus>* parse_errors,
uint64_t* last_read_event_ts,
uint64_t* bundle_end_timestamp,
const uint8_t* parsing_buf,
const size_t pages_read,
CompactSchedBuffer* compact_sched_buf,
Expand All @@ -436,7 +426,7 @@ bool CpuReader::ProcessPagesForDataSource(
Bundler bundler(trace_writer, metadata,
ds_config->symbolize_ksyms ? symbolizer : nullptr, cpu,
ftrace_clock_snapshot, ftrace_clock, compact_sched_buf,
ds_config->compact_sched.enabled, *last_read_event_ts);
ds_config->compact_sched.enabled, *bundle_end_timestamp);

bool success = true;
size_t pages_parsed = 0;
Expand Down Expand Up @@ -472,14 +462,14 @@ bool CpuReader::ProcessPagesForDataSource(
kCompactSchedInternerThreshold;

if (page_header->lost_events || interner_past_threshold) {
// pass in an updated last_read_event_ts since we're starting a new
// pass in an updated bundle_end_timestamp since we're starting a new
// bundle, which needs to reference the last timestamp from the prior one.
bundler.StartNewPacket(page_header->lost_events, *last_read_event_ts);
bundler.StartNewPacket(page_header->lost_events, *bundle_end_timestamp);
}

FtraceParseStatus status =
ParsePagePayload(parse_pos, &page_header.value(), table, ds_config,
&bundler, metadata, last_read_event_ts);
&bundler, metadata, bundle_end_timestamp);

if (status != FtraceParseStatus::FTRACE_STATUS_OK) {
WriteAndSetParseError(&bundler, parse_errors, page_header->timestamp,
Expand Down Expand Up @@ -560,12 +550,12 @@ protos::pbzero::FtraceParseStatus CpuReader::ParsePagePayload(
const FtraceDataSourceConfig* ds_config,
Bundler* bundler,
FtraceMetadata* metadata,
uint64_t* last_read_event_ts) {
uint64_t* bundle_end_timestamp) {
const uint8_t* ptr = start_of_payload;
const uint8_t* const end = ptr + page_header->size;

uint64_t timestamp = page_header->timestamp;
uint64_t last_data_record_ts = 0;
uint64_t last_written_event_ts = 0;

while (ptr < end) {
EventHeader event_header;
Expand Down Expand Up @@ -652,27 +642,26 @@ protos::pbzero::FtraceParseStatus CpuReader::ParsePagePayload(
const CompactSchedWakingFormat& sched_waking_format =
table->compact_sched_format().sched_waking;

// Special-cased filtering of ftrace/print events to retain only the
// matching events.
bool event_written = true;
bool ftrace_print_filter_enabled =
ds_config->print_filter.has_value();

// compact sched_switch
if (compact_sched_enabled &&
ftrace_event_id == sched_switch_format.event_id) {
if (event_size < sched_switch_format.size)
return FtraceParseStatus::FTRACE_STATUS_SHORT_COMPACT_EVENT;

ParseSchedSwitchCompact(start, timestamp, &sched_switch_format,
bundler->compact_sched_buf(), metadata);

// compact sched_waking
} else if (compact_sched_enabled &&
ftrace_event_id == sched_waking_format.event_id) {
if (event_size < sched_waking_format.size)
return FtraceParseStatus::FTRACE_STATUS_SHORT_COMPACT_EVENT;

ParseSchedWakingCompact(start, timestamp, &sched_waking_format,
bundler->compact_sched_buf(), metadata);

} else if (ftrace_print_filter_enabled &&
ftrace_event_id == ds_config->print_filter->event_id()) {
if (ds_config->print_filter->IsEventInteresting(start, next)) {
Expand All @@ -683,6 +672,8 @@ protos::pbzero::FtraceParseStatus CpuReader::ParsePagePayload(
event, metadata)) {
return FtraceParseStatus::FTRACE_STATUS_INVALID_EVENT;
}
} else { // print event did NOT pass the filter
event_written = false;
}
} else {
// Common case: parse all other types of enabled events.
Expand All @@ -694,14 +685,17 @@ protos::pbzero::FtraceParseStatus CpuReader::ParsePagePayload(
return FtraceParseStatus::FTRACE_STATUS_INVALID_EVENT;
}
}
}
last_data_record_ts = timestamp;
ptr = next; // jump to next event
} // default case
if (event_written) {
last_written_event_ts = timestamp;
}
} // IsEventEnabled(id)
ptr = next;
} // case (data_record)
} // switch (event_header.type_or_length)
} // while (ptr < end)
if (last_data_record_ts)
*last_read_event_ts = last_data_record_ts;

if (last_written_event_ts)
*bundle_end_timestamp = last_written_event_ts;
return FtraceParseStatus::FTRACE_STATUS_OK;
}

Expand Down
21 changes: 12 additions & 9 deletions src/traced/probes/ftrace/cpu_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ class CpuReader {
std::unique_ptr<CompactSchedBuffer> compact_sched_;
};

// Helper class to generate `TracePacket`s when needed. Public for testing.
// Facilitates lazy proto writing - not every event in the kernel ring buffer
// is serialised in the trace, so this class allows for trace packets to be
// written only if there's at least one relevant event in the ring buffer
// batch. Public for testing.
class Bundler {
public:
Bundler(TraceWriter* trace_writer,
Expand All @@ -116,7 +119,7 @@ class CpuReader {
protos::pbzero::FtraceClock ftrace_clock,
CompactSchedBuffer* compact_sched_buf,
bool compact_sched_enabled,
uint64_t last_read_event_ts)
uint64_t previous_bundle_end_ts)
: trace_writer_(trace_writer),
metadata_(metadata),
symbolizer_(symbolizer),
Expand All @@ -125,7 +128,7 @@ class CpuReader {
ftrace_clock_(ftrace_clock),
compact_sched_enabled_(compact_sched_enabled),
compact_sched_buf_(compact_sched_buf),
initial_last_read_event_ts_(last_read_event_ts) {
initial_previous_bundle_end_ts_(previous_bundle_end_ts) {
if (compact_sched_enabled_)
compact_sched_buf_->Reset();
}
Expand All @@ -134,13 +137,14 @@ class CpuReader {

protos::pbzero::FtraceEventBundle* GetOrCreateBundle() {
if (!bundle_) {
StartNewPacket(false, initial_last_read_event_ts_);
StartNewPacket(false, initial_previous_bundle_end_ts_);
}
return bundle_;
}

// Forces the creation of a new TracePacket.
void StartNewPacket(bool lost_events, uint64_t last_read_event_timestamp);
void StartNewPacket(bool lost_events,
uint64_t previous_bundle_end_timestamp);

// This function is called after the contents of a FtraceBundle are written.
void FinalizeAndRunSymbolizer();
Expand All @@ -161,7 +165,7 @@ class CpuReader {
protos::pbzero::FtraceClock const ftrace_clock_;
const bool compact_sched_enabled_;
CompactSchedBuffer* const compact_sched_buf_;
uint64_t initial_last_read_event_ts_;
uint64_t initial_previous_bundle_end_ts_;

TraceWriter::TracePacketHandle packet_;
protos::pbzero::FtraceEventBundle* bundle_ = nullptr;
Expand Down Expand Up @@ -306,7 +310,7 @@ class CpuReader {
const FtraceDataSourceConfig* ds_config,
Bundler* bundler,
FtraceMetadata* metadata,
uint64_t* last_read_event_ts);
uint64_t* bundle_end_timestamp);

// Parse a single raw ftrace event beginning at |start| and ending at |end|
// and write it into the provided bundle as a proto.
Expand Down Expand Up @@ -374,7 +378,7 @@ class CpuReader {
size_t cpu,
const FtraceDataSourceConfig* ds_config,
base::FlatSet<protos::pbzero::FtraceParseStatus>* parse_errors,
uint64_t* last_read_event_ts,
uint64_t* bundle_end_timestamp,
const uint8_t* parsing_buf,
size_t pages_read,
CompactSchedBuffer* compact_sched_buf,
Expand Down Expand Up @@ -402,7 +406,6 @@ class CpuReader {
const ProtoTranslationTable* table_;
LazyKernelSymbolizer* symbolizer_;
base::ScopedFile trace_fd_;
uint64_t last_read_event_ts_ = 0;
protos::pbzero::FtraceClock ftrace_clock_{};
const FtraceClockSnapshot* ftrace_clock_snapshot_;
};
Expand Down
24 changes: 12 additions & 12 deletions src/traced/probes/ftrace/cpu_reader_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ TEST_F(CpuReaderParsePagePayloadTest, ParseSixSchedSwitch) {
EXPECT_EQ(last_read_event_ts_, 1'045'157'726'697'236ULL);

auto bundle = GetBundle();
EXPECT_EQ(0u, bundle.last_read_event_timestamp());
EXPECT_EQ(0u, bundle.previous_bundle_end_timestamp());
ASSERT_EQ(bundle.event().size(), 6u);
{
const protos::gen::FtraceEvent& event = bundle.event()[1];
Expand Down Expand Up @@ -1081,7 +1081,7 @@ TEST_F(CpuReaderParsePagePayloadTest, ParseSixSchedSwitchCompactFormat) {
auto bundle = GetBundle();

const auto& compact_sched = bundle.compact_sched();
EXPECT_EQ(0u, bundle.last_read_event_timestamp());
EXPECT_EQ(0u, bundle.previous_bundle_end_timestamp());

EXPECT_EQ(6u, compact_sched.switch_timestamp().size());
EXPECT_EQ(6u, compact_sched.switch_prev_state().size());
Expand Down Expand Up @@ -3569,9 +3569,9 @@ char g_last_ts_test_page_2[] = R"(
00000250: 645f 7072 696e 740a 0000 0000 0000 0000 d_print.........
)";

// Tests that |last_read_event_timestamp| is correctly updated in cases where a
// single ProcessPagesForDataSource call produces multiple ftrace bundle packets
// (due to splitting on data loss markers).
// Tests that |previous_bundle_end_timestamp| is correctly updated in cases
// where a single ProcessPagesForDataSource call produces multiple ftrace bundle
// packets (due to splitting on data loss markers).
TEST(CpuReaderTest, LastReadEventTimestampWithSplitBundles) {
// build test buffer with 3 pages
ProtoTranslationTable* table = GetTable("synthetic");
Expand Down Expand Up @@ -3615,8 +3615,8 @@ TEST(CpuReaderTest, LastReadEventTimestampWithSplitBundles) {
// Therefore we expect two bundles, as we start a new one whenever we
// encounter data loss (to set the |lost_events| field in the bundle proto).
//
// In terms of |last_read_event_timestamp|, the first bundle will emit zero
// since that's our initial input. The second bundle needs to emit the
// In terms of |previous_bundle_end_timestamp|, the first bundle will emit
// zero since that's our initial input. The second bundle needs to emit the
// timestamp of the last event in the first bundle.
auto packets = trace_writer.GetAllTracePackets();
ASSERT_EQ(2u, packets.size());
Expand All @@ -3625,18 +3625,18 @@ TEST(CpuReaderTest, LastReadEventTimestampWithSplitBundles) {
auto const& first_bundle = packets[0].ftrace_events();
EXPECT_FALSE(first_bundle.lost_events());
ASSERT_EQ(2u, first_bundle.event().size());
EXPECT_TRUE(first_bundle.has_last_read_event_timestamp());
EXPECT_EQ(0u, first_bundle.last_read_event_timestamp());
EXPECT_TRUE(first_bundle.has_previous_bundle_end_timestamp());
EXPECT_EQ(0u, first_bundle.previous_bundle_end_timestamp());

const uint64_t kSecondPrintTs = 1308020252356549ULL;
EXPECT_EQ(kSecondPrintTs, first_bundle.event()[1].timestamp());
EXPECT_EQ(0u, first_bundle.last_read_event_timestamp());
EXPECT_EQ(0u, first_bundle.previous_bundle_end_timestamp());

// 1 print + lost_events + updated last_read_event_timestamp
// 1 print + lost_events + updated previous_bundle_end_timestamp
auto const& second_bundle = packets[1].ftrace_events();
EXPECT_TRUE(second_bundle.lost_events());
EXPECT_EQ(1u, second_bundle.event().size());
EXPECT_EQ(kSecondPrintTs, second_bundle.last_read_event_timestamp());
EXPECT_EQ(kSecondPrintTs, second_bundle.previous_bundle_end_timestamp());
}

} // namespace
Expand Down
Loading

0 comments on commit 6f9673a

Please sign in to comment.