diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index a0775d40a1a..5c154090104 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -206,8 +206,8 @@ TraceItem::TraceItem(jsg::Lock& js, const Trace& trace) trace.onsetInfo.dispatchNamespace.map([](auto& ns) { return kj::str(ns); })), scriptTags(getTraceScriptTags(trace)), outcome(getTraceOutcome(trace)), - cpuTime(trace.outcomeInfo.cpuTime / kj::MILLISECONDS), - wallTime(trace.outcomeInfo.wallTime / kj::MILLISECONDS), + cpuTime(trace.cpuTime / kj::MILLISECONDS), + wallTime(trace.wallTime / kj::MILLISECONDS), truncated(trace.truncated) {} kj::Maybe TraceItem::getEvent(jsg::Lock& js) { diff --git a/src/workerd/io/trace-common-test.c++ b/src/workerd/io/trace-common-test.c++ index dfd0a7ecb83..6a4f80878c6 100644 --- a/src/workerd/io/trace-common-test.c++ +++ b/src/workerd/io/trace-common-test.c++ @@ -10,11 +10,11 @@ namespace { KJ_TEST("Tags work") { { - trace::Tag tag(kj::str("a"), 1UL); + trace::Tag tag(kj::str("a"), static_cast(1)); auto& key = KJ_ASSERT_NONNULL(tag.key.tryGet()); auto& value = KJ_ASSERT_NONNULL(tag.value.tryGet()); KJ_EXPECT(key == "a"); - KJ_EXPECT(value == 1UL); + KJ_EXPECT(value == static_cast(1)); KJ_EXPECT(tag.keyMatches("a"_kj)); capnp::MallocMessageBuilder message; @@ -35,16 +35,18 @@ KJ_TEST("Tags work") { { // The key can be a uint32_t - trace::Tag tag(1U, 2.0); + uint32_t a = 1; + trace::Tag tag(a, 2.0); auto key = KJ_ASSERT_NONNULL(tag.key.tryGet()); - KJ_EXPECT(key == 1U); - KJ_EXPECT(tag.keyMatches(1U)); + KJ_EXPECT(key == a); + KJ_EXPECT(tag.keyMatches(a)); } } KJ_TEST("Onset works") { - auto tags = kj::arr(trace::Tag(kj::str("a"), 1UL)); - trace::Onset onset(1U, kj::str("foo"), kj::str("bar"), kj::none, kj::str("baz"), kj::str("qux"), + uint32_t a = 1; + auto tags = kj::arr(trace::Tag(kj::str("a"), static_cast(1))); + trace::Onset onset(a, kj::str("foo"), kj::str("bar"), kj::none, kj::str("baz"), kj::str("qux"), kj::arr(kj::str("quux")), kj::str("corge"), kj::mv(tags)); capnp::MallocMessageBuilder message; @@ -53,7 +55,7 @@ KJ_TEST("Onset works") { auto reader = builder.asReader(); trace::Onset onset2(reader); - KJ_EXPECT(onset.accountId == 1U); + KJ_EXPECT(onset.accountId == a); KJ_EXPECT(KJ_ASSERT_NONNULL(onset.stableId) == "foo"_kj); KJ_EXPECT(KJ_ASSERT_NONNULL(onset.scriptName) == "bar"_kj); KJ_EXPECT(KJ_ASSERT_NONNULL(onset.dispatchNamespace) == "baz"_kj); @@ -79,7 +81,7 @@ KJ_TEST("Onset works") { KJ_EXPECT(encoded == check); auto onset3 = onset.clone(); - KJ_EXPECT(onset3.accountId == 1U); + KJ_EXPECT(onset3.accountId == a); KJ_EXPECT(KJ_ASSERT_NONNULL(onset3.stableId) == "foo"_kj); KJ_EXPECT(KJ_ASSERT_NONNULL(onset3.scriptName) == "bar"_kj); KJ_EXPECT(KJ_ASSERT_NONNULL(onset3.dispatchNamespace) == "baz"_kj); @@ -122,8 +124,9 @@ KJ_TEST("FetchEventInfo works") { } KJ_TEST("ActorFlushInfo works") { - auto tags = kj::arr(trace::Tag(trace::ActorFlushInfo::CommonTags::REASON, 1UL), - trace::Tag(trace::ActorFlushInfo::CommonTags::BROKEN, true)); + auto tags = + kj::arr(trace::Tag(trace::ActorFlushInfo::CommonTags::REASON, static_cast(1)), + trace::Tag(trace::ActorFlushInfo::CommonTags::BROKEN, true)); trace::ActorFlushInfo info(kj::mv(tags)); capnp::MallocMessageBuilder message; @@ -216,9 +219,10 @@ KJ_TEST("AlarmEventInfo works") { } KJ_TEST("QueueEventInfo works") { - trace::QueueEventInfo info(kj::str("foo"), 1U); + uint32_t a = 1; + trace::QueueEventInfo info(kj::str("foo"), a); KJ_EXPECT(info.queueName == "foo"_kj); - KJ_EXPECT(info.batchSize == 1U); + KJ_EXPECT(info.batchSize == a); capnp::MallocMessageBuilder message; auto builder = message.initRoot(); @@ -227,18 +231,19 @@ KJ_TEST("QueueEventInfo works") { auto reader = builder.asReader(); trace::QueueEventInfo info2(reader); KJ_EXPECT(info2.queueName == "foo"_kj); - KJ_EXPECT(info2.batchSize == 1U); + KJ_EXPECT(info2.batchSize == a); auto info3 = info.clone(); KJ_EXPECT(info3.queueName == "foo"_kj); - KJ_EXPECT(info3.batchSize == 1U); + KJ_EXPECT(info3.batchSize == a); } KJ_TEST("EmailEventInfo works") { - trace::EmailEventInfo info(kj::str("foo"), kj::str("bar"), 1U); + uint32_t a = 1; + trace::EmailEventInfo info(kj::str("foo"), kj::str("bar"), a); KJ_EXPECT(info.mailFrom == "foo"_kj); KJ_EXPECT(info.rcptTo == "bar"_kj); - KJ_EXPECT(info.rawSize == 1U); + KJ_EXPECT(info.rawSize == a); capnp::MallocMessageBuilder message; auto builder = message.initRoot(); @@ -248,12 +253,12 @@ KJ_TEST("EmailEventInfo works") { trace::EmailEventInfo info2(reader); KJ_EXPECT(info2.mailFrom == "foo"_kj); KJ_EXPECT(info2.rcptTo == "bar"_kj); - KJ_EXPECT(info2.rawSize == 1U); + KJ_EXPECT(info2.rawSize == a); auto info3 = info.clone(); KJ_EXPECT(info3.mailFrom == "foo"_kj); KJ_EXPECT(info3.rcptTo == "bar"_kj); - KJ_EXPECT(info3.rawSize == 1U); + KJ_EXPECT(info3.rawSize == a); } KJ_TEST("HibernatableWebSocketEventInfo works") { @@ -292,10 +297,8 @@ KJ_TEST("TraceEventInfo works") { } KJ_TEST("Outcome works") { - trace::Outcome outcome(EventOutcome::OK, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS); + trace::Outcome outcome(EventOutcome::OK); KJ_EXPECT(outcome.outcome == EventOutcome::OK); - KJ_EXPECT(outcome.cpuTime == 1 * kj::MILLISECONDS); - KJ_EXPECT(outcome.wallTime == 2 * kj::MILLISECONDS); capnp::MallocMessageBuilder message; auto builder = message.initRoot(); @@ -304,13 +307,9 @@ KJ_TEST("Outcome works") { auto reader = builder.asReader(); trace::Outcome outcome2(reader); KJ_EXPECT(outcome2.outcome == EventOutcome::OK); - KJ_EXPECT(outcome2.cpuTime == 1 * kj::MILLISECONDS); - KJ_EXPECT(outcome2.wallTime == 2 * kj::MILLISECONDS); auto outcome3 = outcome.clone(); KJ_EXPECT(outcome3.outcome == EventOutcome::OK); - KJ_EXPECT(outcome3.cpuTime == 1 * kj::MILLISECONDS); - KJ_EXPECT(outcome3.wallTime == 2 * kj::MILLISECONDS); } KJ_TEST("DiagnosticChannelEvent works") { @@ -359,12 +358,13 @@ KJ_TEST("Log works") { KJ_EXPECT(log3.message == "foo"_kj); } -KJ_TEST("LogV2 workers") { +KJ_TEST("LogV2 works") { kj::Date date = 0 * kj::MILLISECONDS + kj::UNIX_EPOCH; trace::LogV2 log(date, LogLevel::INFO, kj::heapArray(1)); KJ_EXPECT(log.timestamp == date); KJ_EXPECT(log.logLevel == LogLevel::INFO); - KJ_EXPECT(log.data.size() == 1); + KJ_EXPECT(KJ_ASSERT_NONNULL(log.message.tryGet>()).size() == 1); + KJ_EXPECT(!log.truncated); capnp::MallocMessageBuilder message; auto builder = message.initRoot(); @@ -374,12 +374,14 @@ KJ_TEST("LogV2 workers") { trace::LogV2 log2(reader); KJ_EXPECT(log2.timestamp == date); KJ_EXPECT(log2.logLevel == LogLevel::INFO); - KJ_EXPECT(log2.data.size() == 1); + KJ_EXPECT(KJ_ASSERT_NONNULL(log2.message.tryGet>()).size() == 1); + KJ_EXPECT(!log2.truncated); auto log3 = log.clone(); KJ_EXPECT(log3.timestamp == date); KJ_EXPECT(log3.logLevel == LogLevel::INFO); - KJ_EXPECT(log3.data.size() == 1); + KJ_EXPECT(KJ_ASSERT_NONNULL(log3.message.tryGet>()).size() == 1); + KJ_EXPECT(!log3.truncated); } KJ_TEST("Exception works") { @@ -409,10 +411,11 @@ KJ_TEST("Exception works") { } KJ_TEST("Subrequest works") { - trace::Subrequest subrequest(1U, + uint32_t a = 1; + trace::Subrequest subrequest(a, trace::Subrequest::Info(trace::FetchEventInfo( kj::HttpMethod::GET, kj::str("http://example.org"), kj::String(), nullptr))); - KJ_EXPECT(subrequest.id == 1U); + KJ_EXPECT(subrequest.id == a); KJ_EXPECT(KJ_ASSERT_NONNULL(subrequest.info).is()); capnp::MallocMessageBuilder message; @@ -421,18 +424,19 @@ KJ_TEST("Subrequest works") { auto reader = builder.asReader(); trace::Subrequest subrequest2(reader); - KJ_EXPECT(subrequest2.id == 1U); + KJ_EXPECT(subrequest2.id == a); KJ_EXPECT(KJ_ASSERT_NONNULL(subrequest.info).is()); auto subrequest3 = subrequest.clone(); - KJ_EXPECT(subrequest3.id == 1U); + KJ_EXPECT(subrequest3.id == a); KJ_EXPECT(KJ_ASSERT_NONNULL(subrequest.info).is()); } KJ_TEST("SubrequestOutcome works") { - trace::SubrequestOutcome outcome(1U, kj::none, trace::SpanEvent::Outcome::OK); - KJ_EXPECT(outcome.id == 1U); - KJ_EXPECT(outcome.outcome == trace::SpanEvent::Outcome::OK); + uint32_t a = 1; + trace::SubrequestOutcome outcome(a, kj::none, trace::Span::Outcome::OK); + KJ_EXPECT(outcome.id == a); + KJ_EXPECT(outcome.outcome == trace::Span::Outcome::OK); capnp::MallocMessageBuilder message; auto builder = message.initRoot(); @@ -440,19 +444,21 @@ KJ_TEST("SubrequestOutcome works") { auto reader = builder.asReader(); trace::SubrequestOutcome outcome2(reader); - KJ_EXPECT(outcome2.id == 1U); - KJ_EXPECT(outcome2.outcome == trace::SpanEvent::Outcome::OK); + KJ_EXPECT(outcome2.id == a); + KJ_EXPECT(outcome2.outcome == trace::Span::Outcome::OK); auto outcome3 = outcome.clone(); - KJ_EXPECT(outcome3.id == 1U); - KJ_EXPECT(outcome3.outcome == trace::SpanEvent::Outcome::OK); + KJ_EXPECT(outcome3.id == a); + KJ_EXPECT(outcome3.outcome == trace::Span::Outcome::OK); } -KJ_TEST("SpanEvent works") { - trace::SpanEvent event(1U, 0U, trace::SpanEvent::Outcome::OK, false, kj::none, nullptr); - KJ_EXPECT(event.id == 1U); - KJ_EXPECT(event.parent == 0U); - KJ_EXPECT(event.outcome == trace::SpanEvent::Outcome::OK); +KJ_TEST("Span works") { + uint32_t a = 1; + uint32_t b = 0; + trace::Span event(a, b, trace::Span::Outcome::OK, false, kj::none, nullptr); + KJ_EXPECT(event.id == a); + KJ_EXPECT(event.parent == b); + KJ_EXPECT(event.outcome == trace::Span::Outcome::OK); KJ_EXPECT(event.transactional == false); KJ_EXPECT(event.info == kj::none); KJ_EXPECT(event.tags.size() == 0); @@ -462,18 +468,18 @@ KJ_TEST("SpanEvent works") { event.copyTo(builder); auto reader = builder.asReader(); - trace::SpanEvent event2(reader); - KJ_EXPECT(event2.id == 1U); - KJ_EXPECT(event2.parent == 0U); - KJ_EXPECT(event2.outcome == trace::SpanEvent::Outcome::OK); + trace::Span event2(reader); + KJ_EXPECT(event2.id == a); + KJ_EXPECT(event2.parent == b); + KJ_EXPECT(event2.outcome == trace::Span::Outcome::OK); KJ_EXPECT(event2.transactional == false); KJ_EXPECT(event2.info == kj::none); KJ_EXPECT(event2.tags.size() == 0); auto event3 = event.clone(); - KJ_EXPECT(event3.id == 1U); - KJ_EXPECT(event3.parent == 0U); - KJ_EXPECT(event3.outcome == trace::SpanEvent::Outcome::OK); + KJ_EXPECT(event3.id == a); + KJ_EXPECT(event3.parent == b); + KJ_EXPECT(event3.outcome == trace::Span::Outcome::OK); KJ_EXPECT(event3.transactional == false); KJ_EXPECT(event3.info == kj::none); KJ_EXPECT(event3.tags.size() == 0); @@ -522,9 +528,11 @@ KJ_TEST("Metric works") { } KJ_TEST("Dropped works") { - trace::Dropped dropped(1U, 2U); - KJ_EXPECT(dropped.start == 1U); - KJ_EXPECT(dropped.end == 2U); + uint32_t a = 1; + uint32_t b = 2; + trace::Dropped dropped(a, b); + KJ_EXPECT(dropped.start == a); + KJ_EXPECT(dropped.end == b); capnp::MallocMessageBuilder message; auto builder = message.initRoot(); @@ -532,12 +540,12 @@ KJ_TEST("Dropped works") { auto reader = builder.asReader(); trace::Dropped dropped2(reader); - KJ_EXPECT(dropped2.start == 1U); - KJ_EXPECT(dropped2.end == 2U); + KJ_EXPECT(dropped2.start == a); + KJ_EXPECT(dropped2.end == b); auto dropped3 = dropped.clone(); - KJ_EXPECT(dropped3.start == 1U); - KJ_EXPECT(dropped3.end == 2U); + KJ_EXPECT(dropped3.start == a); + KJ_EXPECT(dropped3.end == b); } } // namespace diff --git a/src/workerd/io/trace-common.c++ b/src/workerd/io/trace-common.c++ index 8467dbbf08b..893c23069a9 100644 --- a/src/workerd/io/trace-common.c++ +++ b/src/workerd/io/trace-common.c++ @@ -16,6 +16,7 @@ namespace workerd::trace { namespace { Tags getTagsFromReader(const capnp::List::Reader& tags) { kj::Vector results; + results.reserve(tags.size()); for (auto tag: tags) { results.add(Tag(tag)); } @@ -273,24 +274,16 @@ Onset Onset::clone() const { // ====================================================================================== // Outcome -Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) - : outcome(outcome), - cpuTime(cpuTime), - wallTime(wallTime) {} +Outcome::Outcome(EventOutcome outcome): outcome(outcome) {} -Outcome::Outcome(rpc::Trace::Outcome::Reader reader) - : outcome(reader.getOutcome()), - cpuTime(reader.getCpuTime() * kj::MILLISECONDS), - wallTime(reader.getWallTime() * kj::MILLISECONDS) {} +Outcome::Outcome(rpc::Trace::Outcome::Reader reader): outcome(reader.getOutcome()) {} void Outcome::copyTo(rpc::Trace::Outcome::Builder builder) const { builder.setOutcome(outcome); - builder.setCpuTime(cpuTime / kj::MILLISECONDS); - builder.setWallTime(wallTime / kj::MILLISECONDS); } Outcome Outcome::clone() const { - return Outcome{outcome, cpuTime, wallTime}; + return Outcome{outcome}; } // ====================================================================================== @@ -548,10 +541,8 @@ kj::Vector getTraceItemsFromTraces(kj::ArrayPtr getTraceItemsFromReader( rpc::Trace::TraceEventInfo::Reader reader) { auto traces = reader.getTraces(); - kj::Vector results(traces.size()); - for (size_t n = 0; n < traces.size(); n++) { - results.add(TraceEventInfo::TraceItem(traces[n])); - } + kj::Vector results; + results.addAll(traces); return results.releaseAsArray(); } } // namespace @@ -623,6 +614,22 @@ DiagnosticChannelEvent DiagnosticChannelEvent::clone() const { // ====================================================================================== // Log +namespace { +kj::OneOf, kj::String> getMessageForLog( + const rpc::Trace::LogV2::Reader& reader) { + auto message = reader.getMessage(); + switch (message.which()) { + case rpc::Trace::LogV2::Message::Which::TEXT: { + return kj::str(message.getText()); + } + case rpc::Trace::LogV2::Message::Which::DATA: { + return kj::heapArray(message.getData()); + } + } + KJ_UNREACHABLE; +} +} // namespace + Log::Log(kj::Date timestamp, LogLevel logLevel, kj::String message) : timestamp(timestamp), logLevel(logLevel), @@ -643,21 +650,35 @@ Log Log::clone() const { return Log(timestamp, logLevel, kj::str(message)); } -LogV2::LogV2(kj::Date timestamp, LogLevel logLevel, kj::Array data, Tags tags) +LogV2::LogV2(kj::Date timestamp, + LogLevel logLevel, + kj::OneOf, kj::String> message, + Tags tags, + bool truncated) : timestamp(timestamp), logLevel(logLevel), - data(kj::mv(data)), - tags(kj::mv(tags)) {} + message(kj::mv(message)), + tags(kj::mv(tags)), + truncated(truncated) {} LogV2::LogV2(rpc::Trace::LogV2::Reader reader) : timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), logLevel(reader.getLogLevel()), - data(kj::heapArray(reader.getData())) {} + message(getMessageForLog(reader)), + truncated(reader.getTruncated()) {} void LogV2::copyTo(rpc::Trace::LogV2::Builder builder) const { builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); builder.setLogLevel(logLevel); - builder.setData(data); + KJ_SWITCH_ONEOF(message) { + KJ_CASE_ONEOF(str, kj::String) { + builder.initMessage().setText(str); + } + KJ_CASE_ONEOF(data, kj::Array) { + builder.initMessage().setData(data); + } + } + builder.setTruncated(truncated); auto outTags = builder.initTags(tags.size()); for (size_t n = 0; n < tags.size(); n++) { tags[n].copyTo(outTags[n]); @@ -669,7 +690,18 @@ LogV2 LogV2::clone() const { for (auto& tag: tags) { newTags.add(tag.clone()); } - return LogV2(timestamp, logLevel, kj::heapArray(data), newTags.releaseAsArray()); + auto newMessage = ([&]() -> kj::OneOf, kj::String> { + KJ_SWITCH_ONEOF(message) { + KJ_CASE_ONEOF(str, kj::String) { + return kj::str(str); + } + KJ_CASE_ONEOF(data, kj::Array) { + return kj::heapArray(data); + } + } + KJ_UNREACHABLE; + })(); + return LogV2(timestamp, logLevel, kj::mv(newMessage), newTags.releaseAsArray(), truncated); } // ====================================================================================== @@ -941,10 +973,10 @@ SubrequestOutcome SubrequestOutcome::clone() const { } // ====================================================================================== -// SpanEvent +// Span namespace { -kj::Maybe maybeGetInfo(const rpc::Trace::Span::Reader& reader) { +kj::Maybe maybeGetInfo(const rpc::Trace::Span::Reader& reader) { // if (!reader.hasInfo()) return kj::none; auto info = reader.getInfo(); switch (info.which()) { @@ -970,7 +1002,7 @@ kj::Maybe maybeGetInfo(const rpc::Trace::Span::Reader& reader) } } // namespace -SpanEvent::SpanEvent(uint32_t id, +Span::Span(uint32_t id, uint32_t parent, rpc::Trace::Span::SpanOutcome outcome, bool transactional, @@ -983,7 +1015,7 @@ SpanEvent::SpanEvent(uint32_t id, info(kj::mv(maybeInfo)), tags(kj::mv(tags)) {} -SpanEvent::SpanEvent(rpc::Trace::Span::Reader reader) +Span::Span(rpc::Trace::Span::Reader reader) : id(reader.getId()), parent(reader.getParent()), outcome(reader.getOutcome()), @@ -991,7 +1023,7 @@ SpanEvent::SpanEvent(rpc::Trace::Span::Reader reader) info(maybeGetInfo(reader)), tags(maybeGetTags(reader)) {} -void SpanEvent::copyTo(rpc::Trace::Span::Builder builder) const { +void Span::copyTo(rpc::Trace::Span::Builder builder) const { builder.setId(id); builder.setParent(parent); builder.setOutcome(outcome); @@ -1019,7 +1051,7 @@ void SpanEvent::copyTo(rpc::Trace::Span::Builder builder) const { } } -SpanEvent SpanEvent::clone() const { +Span Span::clone() const { auto newInfo = ([&]() -> kj::Maybe { KJ_IF_SOME(i, info) { KJ_SWITCH_ONEOF(i) { @@ -1040,7 +1072,7 @@ SpanEvent SpanEvent::clone() const { } return kj::none; })(); - return SpanEvent( + return Span( id, parent, outcome, transactional, kj::mv(newInfo), KJ_MAP(tag, tags) { return tag.clone(); }); } diff --git a/src/workerd/io/trace-common.h b/src/workerd/io/trace-common.h index 244483d1d2c..449fd411ecd 100644 --- a/src/workerd/io/trace-common.h +++ b/src/workerd/io/trace-common.h @@ -50,7 +50,7 @@ concept IsEnum = std::is_enum::value; // a double, a string, or an arbitrary byte array. When a byte array is used, the // specific format is specific to the tag key. No general assumptions can be made // about the value without knowledge of the specific key. -struct Tag { +struct Tag final { using TagValue = kj::OneOf>; using TagKey = kj::OneOf; TagKey key; @@ -74,7 +74,9 @@ struct Tag { }; using Tags = kj::Array; -// Metadata describing the onset of a trace session. +// Metadata describing the onset of a trace session. The first event in any trace session +// will always be an Onset event. It details information about which worker, script, etc +// is being traced. Every trace session will have exactly one Onset event. struct Onset final { explicit Onset() = default; Onset(kj::Maybe accountId, @@ -91,6 +93,8 @@ struct Onset final { Onset& operator=(Onset&&) = default; KJ_DISALLOW_COPY(Onset); + // Note that all of these fields could be represented by tags but are kept + // as separate fields for legacy reasons. kj::Maybe accountId = kj::none; kj::Maybe stableId = kj::none; kj::Maybe scriptName = kj::none; @@ -142,6 +146,10 @@ struct FetchEventInfo final { FetchEventInfo clone() const; }; +// The ActorFlushInfo struct is used to attach additional detail about the termination +// of an actor. It is to be included in a closing Span event. The ActorFlushInfo is not +// an event on it's own. This can be used, for instance, to communicate when an actor +// is broken and for what reason. struct ActorFlushInfo final { explicit ActorFlushInfo(Tags tags = nullptr); ActorFlushInfo(rpc::Trace::ActorFlushInfo::Reader reader); @@ -157,6 +165,9 @@ struct ActorFlushInfo final { ActorFlushInfo clone() const; }; +// The FetchResponseInfo struct is used to attach additional detail about the conclusion +// of fetch request. It is to be included in a closing Span event. The FetchResponseInfo +// is not an event on it's own. struct FetchResponseInfo final { explicit FetchResponseInfo(uint16_t statusCode); FetchResponseInfo(rpc::Trace::FetchResponseInfo::Reader reader); @@ -170,6 +181,7 @@ struct FetchResponseInfo final { FetchResponseInfo clone() const; }; +// Metadata describing the start of a JS RPC event struct JsRpcEventInfo final { explicit JsRpcEventInfo(kj::String methodName); JsRpcEventInfo(rpc::Trace::JsRpcEventInfo::Reader reader); @@ -183,6 +195,7 @@ struct JsRpcEventInfo final { JsRpcEventInfo clone() const; }; +// Metadata describing the start of a scheduled worker event (cron workers). struct ScheduledEventInfo final { explicit ScheduledEventInfo(double scheduledTime, kj::String cron); ScheduledEventInfo(rpc::Trace::ScheduledEventInfo::Reader reader); @@ -197,6 +210,7 @@ struct ScheduledEventInfo final { ScheduledEventInfo clone() const; }; +// Metadata describing the start of an alarm event. struct AlarmEventInfo final { explicit AlarmEventInfo(kj::Date scheduledTime); AlarmEventInfo(rpc::Trace::AlarmEventInfo::Reader reader); @@ -210,6 +224,7 @@ struct AlarmEventInfo final { AlarmEventInfo clone() const; }; +// Metadata describing the start of a queue event. struct QueueEventInfo final { explicit QueueEventInfo(kj::String queueName, uint32_t batchSize); QueueEventInfo(rpc::Trace::QueueEventInfo::Reader reader); @@ -224,6 +239,7 @@ struct QueueEventInfo final { QueueEventInfo clone() const; }; +// Metadata describing the start of an email event. struct EmailEventInfo final { explicit EmailEventInfo(kj::String mailFrom, kj::String rcptTo, uint32_t rawSize); EmailEventInfo(rpc::Trace::EmailEventInfo::Reader reader); @@ -239,6 +255,7 @@ struct EmailEventInfo final { EmailEventInfo clone() const; }; +// Metadata describing the start of a hibernatable web socket event. struct HibernatableWebSocketEventInfo final { struct Message {}; struct Close { @@ -262,7 +279,9 @@ struct HibernatableWebSocketEventInfo final { HibernatableWebSocketEventInfo clone() const; }; -struct CustomEventInfo { +// Metadata describing the start of a custom event (currently unused) but here +// to support legacy trace use cases. +struct CustomEventInfo final { explicit CustomEventInfo() {}; CustomEventInfo(rpc::Trace::CustomEventInfo::Reader reader) {}; CustomEventInfo(CustomEventInfo&&) = default; @@ -274,6 +293,8 @@ struct CustomEventInfo { } }; +// Metadata describing the start of a legacy tail event (that is, the event +// that delivers collected traces to a legacy tail worker) struct TraceEventInfo final { struct TraceItem; @@ -303,6 +324,10 @@ struct TraceEventInfo final { TraceEventInfo clone() const; }; +// The set of structs that make up the EventInfo union all represent trigger +// events for worker requests... That is, for instance, when a worker receives +// a fetch request, a FetchEventInfo will be emitted; if the worker receives a +// JS RPC request, a JsRpcEventInfo will be emitted; and so on. using EventInfo = kj::OneOf; -// Used to describe the final outcome of the trace. +// Used to describe the final outcome of the trace. Every trace session should +// have at most a single Outcome event that is the final event in the stream. struct Outcome final { Outcome() = default; - explicit Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime); + explicit Outcome(EventOutcome outcome); Outcome(rpc::Trace::Outcome::Reader reader); Outcome(Outcome&&) = default; Outcome& operator=(Outcome&&) = default; KJ_DISALLOW_COPY(Outcome); EventOutcome outcome = EventOutcome::UNKNOWN; - kj::Duration cpuTime; - kj::Duration wallTime; void copyTo(rpc::Trace::Outcome::Builder builder) const; Outcome clone() const; }; +// Describes an event caused by use of the Node.js diagnostics-channel API. struct DiagnosticChannelEvent final { explicit DiagnosticChannelEvent( kj::Date timestamp, kj::String channel, kj::Array message); @@ -346,6 +371,9 @@ struct DiagnosticChannelEvent final { DiagnosticChannelEvent clone() const; }; +// The Log struct is used by the legacy tracing (v1 tail workers) model and +// is unchanged for treaming traces. This is the struct that is used, for +// instance, to carry console.log outputs into the trace. struct Log final { explicit Log(kj::Date timestamp, LogLevel logLevel, kj::String message); Log(rpc::Trace::Log::Reader reader); @@ -363,9 +391,19 @@ struct Log final { Log clone() const; }; +// The LogV2 struct is used by the streaming trace model. It serves the same +// purpose as the Log struct above but allows for v8 serialized binary data +// as an alternative to plain text and allows for additional metadata tags +// to be added. It is separated out into a new struct in order to prevent +// introducing any backwards compat issues with the original legacy trace +// implementation. +// TODO(soon): Coallesce Log and LogV2 struct LogV2 final { - explicit LogV2( - kj::Date timestamp, LogLevel logLevel, kj::Array data, Tags tags = nullptr); + explicit LogV2(kj::Date timestamp, + LogLevel logLevel, + kj::OneOf, kj::String> message, + Tags tags = nullptr, + bool truncated = false); LogV2(rpc::Trace::LogV2::Reader reader); LogV2(LogV2&&) = default; LogV2& operator=(LogV2&&) = default; @@ -375,17 +413,30 @@ struct LogV2 final { kj::Date timestamp; LogLevel logLevel; - kj::Array data; + kj::OneOf, kj::String> message; Tags tags = nullptr; + bool truncated = false; void copyTo(rpc::Trace::LogV2::Builder builder) const; LogV2 clone() const; }; +// Describes metadata of an Exception that is added to the trace. struct Exception final { + // The Detail here is additional, optional information about the exception. + // Intende to provide additional context when appropriate. Not every error + // will have a Detail. struct Detail { + // If the JS Error object has a cause property, then it will be serialized + // into it's own Exception instance and attached here. kj::Maybe> cause = kj::none; + // If the JS Error object is an AggregateError or SuppressedError, or if + // it has an errors property like those standard types, then those errors + // will be serialized into their own Exception instances and attached here. kj::Array> errors = nullptr; + + // These flags match the additional workers-specific properties that we + // add to errors. See makeInternalError in jsg/util.c++. bool remote = false; bool retryable = false; bool overloaded = false; @@ -417,6 +468,8 @@ struct Exception final { Exception clone() const; }; +// Describes the start (and kind) of a subrequest initiated during the trace. +// For instance, a fetch request or a JS RPC call. struct Subrequest final { using Info = kj::OneOf; explicit Subrequest(uint32_t id, kj::Maybe info); @@ -432,6 +485,9 @@ struct Subrequest final { Subrequest clone() const; }; +// Describes the results of a subrequest initiated during the trace. The id +// must match a previously emitted Subrequest event, and the info (if provided) +// should correlate to the kind of subrequest that was made. struct SubrequestOutcome final { using Info = kj::OneOf; explicit SubrequestOutcome( @@ -443,37 +499,57 @@ struct SubrequestOutcome final { uint32_t id; kj::Maybe info; + + // A subrequest is a technically a special form of span, and therefore can + // have a Span outcome. rpc::Trace::Span::SpanOutcome outcome; void copyTo(rpc::Trace::SubrequestOutcome::Builder builder) const; SubrequestOutcome clone() const; }; -struct SpanEvent final { +// The span struct identifies the *close* of a span in the stream. A span is +// a logical grouping of events. Spans can be nested. +struct Span final { using Outcome = rpc::Trace::Span::SpanOutcome; using Info = kj::OneOf; - explicit SpanEvent(uint32_t id, + explicit Span(uint32_t id, uint32_t parentId, rpc::Trace::Span::SpanOutcome outcome, bool transactional = false, kj::Maybe maybeInfo = kj::none, Tags tags = nullptr); - SpanEvent(rpc::Trace::Span::Reader reader); - SpanEvent(SpanEvent&&) = default; - SpanEvent& operator=(SpanEvent&&) = default; - KJ_DISALLOW_COPY(SpanEvent); + Span(rpc::Trace::Span::Reader reader); + Span(Span&&) = default; + Span& operator=(Span&&) = default; + KJ_DISALLOW_COPY(Span); + // The id of the span being closed. This value should always be a value > 0. uint32_t id; + + // Identifies the parent span (if any). uint32_t parent; + rpc::Trace::Span::SpanOutcome outcome; + + // If a span is transactional, it means that any events occurring within that + // span should be considered invalid if the span outcome is canceled or errored. bool transactional; + + // When the span is initiated with an opening EventInfo event, then the closing + // span event should include an info field that matches the kind of event that + // started the span. For instance, if the span was started with a FetchEventInfo + // event, then the span should be closed with a FetchResponseInfo included in the + // Span event. kj::Maybe info; Tags tags; void copyTo(rpc::Trace::Span::Builder builder) const; - SpanEvent clone() const; + Span clone() const; }; +// A Mark is a simple event that can be used to mark a significant point in the +// trace. This currently has no analog in the current tail workers model. struct Mark final { explicit Mark(kj::String name); Mark(rpc::Trace::Mark::Reader reader); @@ -487,11 +563,20 @@ struct Mark final { Mark clone() const; }; +// A Metric is a key-value pair that can be emitted as part of the trace. Metrics +// include things like CPU time, Wall time, isolate heap memory usage, etc. The +// structure is left intentionally flexible to allow for a wide range of possible +// metrics to be emitted. This is a new feature in the streaming trace model. struct Metric final { using Key = kj::OneOf; using Value = kj::OneOf; using Type = rpc::Trace::Metric::Type; + enum class Common { + CPU_TIME, + WALL_TIME, + }; + explicit Metric(Type type, Key key, Value value); template @@ -516,9 +601,22 @@ struct Metric final { void copyTo(rpc::Trace::Metric::Builder builder) const; Metric clone() const; + + static inline Metric forWallTime(kj::Duration duration) { + return Metric(Type::COUNTER, Common::WALL_TIME, duration / kj::MILLISECONDS); + } + + static Metric forCpuTime(kj::Duration duration) { + return Metric(Type::COUNTER, Common::CPU_TIME, duration / kj::MILLISECONDS); + } }; using Metrics = kj::Array; +// A Dropped event can be used to indicate that a specific range of events were +// dropped from the stream. This would typically be used in cases where the process +// may be too overloaded to successfully deliver all events to the tail worker. +// The start/end values are inclusive and indicate the range of events (by sequence +// number) that were dropped. struct Dropped final { explicit Dropped(uint32_t start, uint32_t end); Dropped(rpc::Trace::Dropped::Reader reader); @@ -532,7 +630,8 @@ struct Dropped final { void copyTo(rpc::Trace::Dropped::Builder builder) const; Dropped clone() const; }; - +// The EventDetail union identifies types of events that happen *within* a span. +// They may occur any number of times within the span and are never terminal. using EventDetail = kj::OneOf; -// ====================================================================================== -// Represents a trace span. `Span` objects are delivered to `SpanObserver`s for recording. To -// create a `Span`, use a `SpanBuilder`. (Used in the legacy trace api) -struct Span { - using TagValue = kj::OneOf; - // TODO(someday): Support binary bytes, too. - using TagMap = kj::HashMap; - using Tag = TagMap::Entry; - - struct Log { - kj::Date timestamp; - Tag tag; - }; - - kj::ConstString operationName; - kj::Date startTime; - kj::Date endTime; - TagMap tags; - kj::Vector logs; - - // We set an arbitrary (-ish) cap on log messages for safety. If we drop logs because of this, - // we report how many in a final "dropped_logs" log. - // - // At the risk of being too clever, I chose a limit that is one below a power of two so that - // we'll typically have space for one last element available for the "dropped_logs" log without - // needing to grow the vector. - static constexpr auto MAX_LOGS = 1023; - uint droppedLogs = 0; - - explicit Span(kj::ConstString operationName, kj::Date startTime) - : operationName(kj::mv(operationName)), - startTime(startTime), - endTime(startTime) {} -}; - // ====================================================================================== // The base class for both the original legacy Trace (defined in trace-legacy.h) // and the new StreamingTrace (defined in trace-streaming.h) diff --git a/src/workerd/io/trace-legacy.c++ b/src/workerd/io/trace-legacy.c++ index 4523fd55e45..0d217c3c0d0 100644 --- a/src/workerd/io/trace-legacy.c++ +++ b/src/workerd/io/trace-legacy.c++ @@ -49,8 +49,8 @@ void Trace::copyTo(rpc::Trace::Builder builder) { builder.setTruncated(truncated); builder.setOutcome(outcomeInfo.outcome); - builder.setCpuTime(outcomeInfo.cpuTime / kj::MILLISECONDS); - builder.setWallTime(outcomeInfo.wallTime / kj::MILLISECONDS); + builder.setCpuTime(cpuTime / kj::MILLISECONDS); + builder.setWallTime(wallTime / kj::MILLISECONDS); KJ_IF_SOME(name, onsetInfo.scriptName) { builder.setScriptName(name); } @@ -144,8 +144,8 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev truncated = reader.getTruncated(); outcomeInfo.outcome = reader.getOutcome(); - outcomeInfo.cpuTime = reader.getCpuTime() * kj::MILLISECONDS; - outcomeInfo.wallTime = reader.getWallTime() * kj::MILLISECONDS; + cpuTime = reader.getCpuTime() * kj::MILLISECONDS; + wallTime = reader.getWallTime() * kj::MILLISECONDS; // mergeFrom() is called both when deserializing traces from a sandboxed // worker and when deserializing traces sent to a sandboxed trace worker. In @@ -318,7 +318,7 @@ void Trace::addDiagnosticChannelEvent(trace::DiagnosticChannelEvent&& event) { diagnosticChannelEvents.add(kj::mv(event)); } -void Trace::addSpan(const trace::Span&& span, kj::String spanContext) { +void Trace::addSpan(const Span&& span, kj::String spanContext) { // This is where we'll actually encode the span for now. // Drop any spans beyond MAX_LIME_SPANS. if (numSpans >= MAX_LIME_SPANS) { @@ -339,7 +339,7 @@ void Trace::addSpan(const trace::Span&& span, kj::String spanContext) { // TODO(cleanup): Create a function in kj::OneOf to automatically convert to a given type (i.e // String) to avoid having to handle each type explicitly here. - for (const trace::Span::TagMap::Entry& tag: span.tags) { + for (const Span::TagMap::Entry& tag: span.tags) { auto value = [&]() { KJ_SWITCH_ONEOF(tag.value) { KJ_CASE_ONEOF(str, kj::String) { @@ -368,4 +368,22 @@ void Trace::setFetchResponseInfo(trace::FetchResponseInfo&& info) { fetchResponseInfo = kj::mv(info); } +void Trace::addMetrics(trace::Metrics&& metrics) { + for (auto& metric: metrics) { + if (metric.keyMatches(trace::Metric::Common::CPU_TIME)) { + // The CPU_TIME metric will always be a int64_t converted from a kj::Duration + // If it's not, we'll ignore it. + KJ_IF_SOME(i, metric.value.tryGet()) { + cpuTime = i * kj::MILLISECONDS; + } + } else if (metric.keyMatches(trace::Metric::Common::WALL_TIME)) { + // The WALL_TIME metric will always be a int64_t converted from a kj::Duration + // If it's not, we'll ignore it. + KJ_IF_SOME(i, metric.value.tryGet()) { + wallTime = i * kj::MILLISECONDS; + } + } + } +} + } // namespace workerd diff --git a/src/workerd/io/trace-legacy.h b/src/workerd/io/trace-legacy.h index a76db347100..8fa3b7cc7b6 100644 --- a/src/workerd/io/trace-legacy.h +++ b/src/workerd/io/trace-legacy.h @@ -12,6 +12,46 @@ namespace workerd { +// ====================================================================================== +// Represents a trace span. `Span` objects are delivered to `SpanObserver`s for recording. To +// create a `Span`, use a `SpanBuilder`. (Used in the legacy trace api) +// Note that this is not the same thing as a trace::Span. This class is part of the legacy +// API for representing spans using log messages instead. +struct Span final { + using TagValue = kj::OneOf; + // TODO(someday): Support binary bytes, too. + using TagMap = kj::HashMap; + using Tag = TagMap::Entry; + + struct Log { + kj::Date timestamp; + Tag tag; + }; + + kj::ConstString operationName; + kj::Date startTime; + kj::Date endTime; + TagMap tags; + kj::Vector logs; + + // We set an arbitrary (-ish) cap on log messages for safety. If we drop logs because of this, + // we report how many in a final "dropped_logs" log. + // + // At the risk of being too clever, I chose a limit that is one below a power of two so that + // we'll typically have space for one last element available for the "dropped_logs" log without + // needing to grow the vector. + static constexpr uint16_t MAX_LOGS = 1023; + kj::uint droppedLogs = 0; + + explicit Span(kj::ConstString operationName, kj::Date startTime) + : operationName(kj::mv(operationName)), + startTime(startTime), + endTime(startTime) {} + Span(Span&&) = default; + Span& operator=(Span&&) = default; + KJ_DISALLOW_COPY(Span); +}; + // This is the original implementation of how trace worker data was collected. All of the // data is stored in an in-memory structure and delivered as a single unit to the trace // worker only when the request is fully completed. The data is held in memory and capped @@ -51,6 +91,8 @@ class Trace final: public kj::Refcounted, public trace::TraceBase { trace::Onset onsetInfo; trace::Outcome outcomeInfo{}; + kj::Duration cpuTime; + kj::Duration wallTime; kj::Vector logs; // TODO(o11y): Convert this to actually store spans. @@ -68,7 +110,7 @@ class Trace final: public kj::Refcounted, public trace::TraceBase { void setEventInfo(kj::Date timestamp, trace::EventInfo&& info); void setOutcome(trace::Outcome&& outcome); void setFetchResponseInfo(trace::FetchResponseInfo&& info); - void addSpan(const trace::Span&& span, kj::String spanContext); + void addSpan(const Span&& span, kj::String spanContext); void addLog(trace::Log&& log, bool isSpan = false); void addException(trace::Exception&& exception) override; @@ -78,9 +120,7 @@ class Trace final: public kj::Refcounted, public trace::TraceBase { // These are currently ignored for legacy traces. } - void addMetrics(trace::Metrics&& metrics) override { - // These are currently ignored for legacy traces. - } + void addMetrics(trace::Metrics&& metrics) override; void addSubrequest(trace::Subrequest&& subrequest) override { // These are currently ignored for legacy traces. diff --git a/src/workerd/io/trace-streaming-test.c++ b/src/workerd/io/trace-streaming-test.c++ index 4147ef37606..5af2e43fe61 100644 --- a/src/workerd/io/trace-streaming-test.c++ +++ b/src/workerd/io/trace-streaming-test.c++ @@ -12,20 +12,14 @@ struct MockTimeProvider final: public StreamingTrace::TimeProvider { kj::Date getNow() const override { return 0 * kj::MILLISECONDS + kj::UNIX_EPOCH; } - kj::Duration getCpuTime() const override { - return 0 * kj::MILLISECONDS; - } - kj::Duration getWallTime() const override { - return 0 * kj::MILLISECONDS; - } }; MockTimeProvider mockTimeProvider; -KJ_TEST("We can create a simple, empty streaming trace session with implicit canceled outcome") { +KJ_TEST("We can create a simple, empty streaming trace session with implicit unknown outcome") { trace::Onset onset; // In this test we are creating a simple trace with no events or spans. // The delegate should be called exactly twice, once with the onset and - // once with an implicit cancele outcome (since we're not explicitly calling + // once with an implicit unknown outcome (since we're not explicitly calling // setOutcome ourselves) int callCount = 0; kj::String id = nullptr; @@ -49,7 +43,7 @@ KJ_TEST("We can create a simple, empty streaming trace session with implicit can KJ_EXPECT(event.sequence == 1, "the sequence should have been incremented"); auto& outcome = KJ_ASSERT_NONNULL( event.event.tryGet(), "the event should be an outcome event"); - KJ_EXPECT(outcome.outcome == EventOutcome::CANCELED, "the outcome should be canceled"); + KJ_EXPECT(outcome.outcome == EventOutcome::UNKNOWN, "the outcome should be unknown"); break; } } @@ -62,8 +56,7 @@ KJ_TEST("We can create a simple, empty streaming trace session with expicit canc trace::Onset onset; // In this test we are creating a simple trace with no events or spans. // The delegate should be called exactly twice, once with the onset and - // once with an implicit cancele outcome (since we're not explicitly calling - // setOutcome ourselves) + // once with an explicit canceled outcome. int callCount = 0; kj::String id = nullptr; auto streamingTrace = workerd::StreamingTrace::create( @@ -89,18 +82,15 @@ KJ_TEST("We can create a simple, empty streaming trace session with expicit canc } } }, mockTimeProvider); - streamingTrace->setOutcome( - trace::Outcome(EventOutcome::CANCELED, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS)); + streamingTrace->setOutcome(trace::Outcome(EventOutcome::CANCELED)); KJ_EXPECT(callCount == 2); } KJ_TEST( - "We can create a simple, streaming trace session with a single implicitly canceled stage span") { + "We can create a simple, streaming trace session with a single implicitly unknown stage span") { trace::Onset onset; // In this test we are creating a simple trace with no events or spans. - // The delegate should be called exactly twice, once with the onset and - // once with an implicit cancele outcome (since we're not explicitly calling - // setOutcome ourselves) + // The delegate should be called exactly four times. int callCount = 0; kj::String id = nullptr; { @@ -135,8 +125,8 @@ KJ_TEST( KJ_EXPECT(event.span.transactional == false, "the root span should not be transactional"); KJ_EXPECT(event.id == id); auto& span = KJ_ASSERT_NONNULL( - event.event.tryGet(), "the event should be a span event"); - KJ_EXPECT(span.outcome == rpc::Trace::Span::SpanOutcome::CANCELED); + event.event.tryGet(), "the event should be a span event"); + KJ_EXPECT(span.outcome == rpc::Trace::Span::SpanOutcome::UNKNOWN); break; } case 3: { @@ -147,7 +137,7 @@ KJ_TEST( KJ_EXPECT(event.sequence == 3, "the sequence should have been incremented"); auto& outcome = KJ_ASSERT_NONNULL( event.event.tryGet(), "the event should be an outcome event"); - KJ_EXPECT(outcome.outcome == EventOutcome::CANCELED, "the outcome should be canceled"); + KJ_EXPECT(outcome.outcome == EventOutcome::UNKNOWN, "the outcome should be canceled"); break; } } @@ -164,9 +154,7 @@ KJ_TEST( KJ_TEST("We can create a simple, streaming trace session with a single explicitly canceled trace") { trace::Onset onset; // In this test we are creating a simple trace with no events or spans. - // The delegate should be called exactly twice, once with the onset and - // once with an implicit cancele outcome (since we're not explicitly calling - // setOutcome ourselves) + // The delegate should be called exactly five times. int callCount = 0; kj::String id = nullptr; { @@ -214,7 +202,7 @@ KJ_TEST("We can create a simple, streaming trace session with a single explicitl KJ_EXPECT(event.sequence, 3); KJ_EXPECT(event.id == id); auto& span = KJ_ASSERT_NONNULL( - event.event.tryGet(), "the event should be a span event"); + event.event.tryGet(), "the event should be a span event"); KJ_EXPECT(span.outcome == rpc::Trace::Span::SpanOutcome::CANCELED); break; } @@ -237,8 +225,7 @@ KJ_TEST("We can create a simple, streaming trace session with a single explicitl kj::HttpMethod::GET, kj::str("http://example.com"), kj::String(), {})); stage->addMark(trace::Mark(kj::str("bar"))); // Intentionally not calling setOutcome on the stage span or the trace itself. - streamingTrace->setOutcome( - trace::Outcome(EventOutcome::CANCELED, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS)); + streamingTrace->setOutcome(trace::Outcome(EventOutcome::CANCELED)); // Once the outcome is set, no more events should be emitted but calling the methods on // the span shouldn't crash or error. diff --git a/src/workerd/io/trace-streaming.c++ b/src/workerd/io/trace-streaming.c++ index bcba44f789f..30f62edd41b 100644 --- a/src/workerd/io/trace-streaming.c++ +++ b/src/workerd/io/trace-streaming.c++ @@ -10,6 +10,10 @@ namespace workerd { // ====================================================================================== // TailIDs namespace { +// The UuidId implementation is really intended only for testing and local development. +// In production, it likely makes more sense to use a RayID or something that can be +// better correlated to other diagnostic and tracing mechanisms, and that can be better +// guaranteed to be sufficiently unique across the entire production environment. class UuidId final: public StreamingTrace::IdFactory::Id { public: UuidId(): uuid(randomUUID(kj::none)) {} @@ -48,6 +52,8 @@ kj::Own StreamingTrace::IdFactory::newUuidIdFactory() kj::Own StreamingTrace::IdFactory::newIdFromString( kj::StringPtr str) { + // This is cheating a bit. We're not actually creating a UUID here but the UuidId class + // is really just a wrapper around a string so we can safely use it here. return kj::heap(kj::str(str)); } @@ -84,20 +90,6 @@ constexpr rpc::Trace::Span::SpanOutcome eventOutcomeToSpanOutcome(const EventOut } } // namespace -// When the StreamingTrace is created it will need to be passed an RPC Client to forward -// traces on to. The initial onset event will be forwarded as part of the initial request. -// The StreamingTrace will then be responsible for forwarding all subsequent trace events -// to the RPC Client. To prevent too many small requests, the StreamingTrace will need to -// buffer events until a certain threshold is reached, at which point it will forward the -// buffered events to the RPC Client. -// -// The threshold be the combined size of the events in the queue coupled with a timeout. -// Whichever condition is met first will trigger the forwarding of the events. -// -// The setEventInfo(...) *MUST* be called first. -// The setOutoutcomeInfo(...) *MUST* be called last. Once called, no other events can be -// emitted by the StreamingTrace. - struct StreamingTrace::Impl { kj::Own id; trace::Onset onsetInfo; @@ -136,9 +128,10 @@ StreamingTrace::StreamingTrace(kj::Own id, } StreamingTrace::~StreamingTrace() noexcept(false) { - KJ_IF_SOME(i, impl) { - setOutcome(trace::Outcome( - EventOutcome::CANCELED, i->timeProvider.getCpuTime(), i->timeProvider.getWallTime())); + if (impl != kj::none) { + // If the streaming tracing is dropped without having an outcome explicitly + // specified, the outcome is explicitly set to unknown. + setOutcome(trace::Outcome(EventOutcome::UNKNOWN)); } // Stage spans should be closed by calling setOutcome above KJ_ASSERT(spans.empty(), "all stage spans must be closed before the trace is destroyed"); @@ -242,8 +235,8 @@ struct StreamingTrace::Span::Impl { traceImpl->timeProvider.getNow(), trace.getNextSequence(), kj::mv(payload)); } - StreamEvent makeSpanEvent(rpc::Trace::Span::SpanOutcome outcome, - kj::Maybe maybeInfo, + StreamEvent makeSpan(rpc::Trace::Span::SpanOutcome outcome, + kj::Maybe maybeInfo, trace::Tags tags) { auto& tailId = KJ_ASSERT_NONNULL(trace.getId(), "the streaming trace is closed"); auto& traceImpl = KJ_ASSERT_NONNULL(trace.impl); @@ -254,7 +247,7 @@ struct StreamingTrace::Span::Impl { .transactional = (options & Options::TRANSACTIONAL) == Options::TRANSACTIONAL, }, traceImpl->timeProvider.getNow(), trace.getNextSequence(), - trace::SpanEvent(id, parentSpan, outcome, + trace::Span(id, parentSpan, outcome, (options & Options::TRANSACTIONAL) == Options::TRANSACTIONAL, kj::mv(maybeInfo), kj::mv(tags))); } @@ -270,20 +263,20 @@ StreamingTrace::Span::Span(kj::List& parentSpans, } void StreamingTrace::Span::setOutcome(rpc::Trace::Span::SpanOutcome outcome, - kj::Maybe maybeInfo, + kj::Maybe maybeInfo, trace::Tags tags) { KJ_IF_SOME(i, impl) { // Then close out the stream by destroying the impl for (auto& span: spans) { span.setOutcome(outcome); } KJ_ASSERT(spans.empty(), "all child spans must be closed before the parent span is closed"); - i->trace.addStreamEvent(i->makeSpanEvent(outcome, kj::mv(maybeInfo), kj::mv(tags))); + i->trace.addStreamEvent(i->makeSpan(outcome, kj::mv(maybeInfo), kj::mv(tags))); impl = kj::none; } } StreamingTrace::Span::~Span() noexcept(false) { - setOutcome(rpc::Trace::Span::SpanOutcome::CANCELED); + setOutcome(rpc::Trace::Span::SpanOutcome::UNKNOWN); KJ_ASSERT(spans.empty(), "all schild spans must be closed before the trace is destroyed"); } @@ -377,7 +370,7 @@ StreamEvent::Event getEvent(const rpc::Trace::StreamEvent::Reader& reader) { return trace::Dropped(event.getDropped()); } case rpc::Trace::StreamEvent::Event::Which::SPAN: { - return trace::SpanEvent(event.getSpan()); + return trace::Span(event.getSpan()); } case rpc::Trace::StreamEvent::Event::Which::INFO: { auto info = event.getInfo(); @@ -494,7 +487,7 @@ void StreamEvent::copyTo(rpc::Trace::StreamEvent::Builder builder) const { KJ_CASE_ONEOF(dropped, trace::Dropped) { dropped.copyTo(builder.getEvent().getDropped()); } - KJ_CASE_ONEOF(span, trace::SpanEvent) { + KJ_CASE_ONEOF(span, trace::Span) { span.copyTo(builder.getEvent().getSpan()); } KJ_CASE_ONEOF(info, Info) { @@ -582,7 +575,7 @@ StreamEvent StreamEvent::clone() const { KJ_CASE_ONEOF(dropped, trace::Dropped) { return dropped.clone(); } - KJ_CASE_ONEOF(span, trace::SpanEvent) { + KJ_CASE_ONEOF(span, trace::Span) { return span.clone(); } KJ_CASE_ONEOF(info, Info) { diff --git a/src/workerd/io/trace-streaming.h b/src/workerd/io/trace-streaming.h index 78dc085c481..ffa7062aeec 100644 --- a/src/workerd/io/trace-streaming.h +++ b/src/workerd/io/trace-streaming.h @@ -13,13 +13,15 @@ namespace workerd { // // The streaming trace itself is considered the root span, whose span ID is always 0. The // root span will always start with an Onset event that communicates basic metadata about -// the worker being traced; and always ends with an Outcome event that communicates the -// final disposition of the traced worker. +// the worker being traced (for instance, script ID, script version, etc); and always ends +// with an Outcome event that communicates the final disposition of the traced worker. // // The root span may have zero or more "Stage Spans". These represent pipeline stages. // There is no requirement that a streaming trace will cover multiple stages in a worker // pipeline but the design allows for it. Generally speaking there should be at least -// one stage span per streaming trace. +// one stage span per streaming trace. This model allows for a single trace to cover +// multiple stages of a pipeline but in actual practice we're most likely to see only +// a single request per trace, even with durable objects. // // A stage span will never be transactional and will always have the root span as its // parent. It should always start with an "Info" event that describes the trigger event @@ -56,7 +58,8 @@ namespace workerd { // ====================================================================================== // StreamEvent -// All events on the streaming trace are StreamEvents. +// All events on the streaming trace are StreamEvents. A StreamEvent is essentialy +// just an envelope for the actual event data. struct StreamEvent final { // The ID of the streaming trace session. This is used to correlate all events // occurring within the same trace session. @@ -79,8 +82,7 @@ struct StreamEvent final { using Info = trace::EventInfo; using Detail = trace::EventDetail; - using Event = - kj::OneOf; + using Event = kj::OneOf; Event event; explicit StreamEvent( @@ -134,8 +136,6 @@ class StreamingTrace final { // to abstract exactly how the trace gets current time. struct TimeProvider { virtual kj::Date getNow() const = 0; - virtual kj::Duration getCpuTime() const = 0; - virtual kj::Duration getWallTime() const = 0; }; static kj::Own create(IdFactory& idFactory, @@ -185,7 +185,7 @@ class StreamingTrace final { virtual ~Span() noexcept(false); void setOutcome(rpc::Trace::Span::SpanOutcome outcome, - kj::Maybe maybeInfo = kj::none, + kj::Maybe maybeInfo = kj::none, trace::Tags tags = nullptr); void addLog(trace::LogV2&& log); diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index ccbec206c06..99470a382b1 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -96,9 +96,10 @@ kj::Own PipelineTracer::makeWorkerTracer(PipelineLogLevel pipeline kj::Maybe dispatchNamespace, kj::Array scriptTags, kj::Maybe entrypoint) { - auto trace = kj::refcounted(trace::Onset(kj::none, // TODO(now): Pass the account id - kj::mv(stableId), kj::mv(scriptName), kj::mv(scriptVersion), kj::mv(dispatchNamespace), - kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint))); + // TODO(streaming-trace): Pass the account id + auto trace = kj::refcounted( + trace::Onset(kj::none, kj::mv(stableId), kj::mv(scriptName), kj::mv(scriptVersion), + kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint))); traces.add(kj::addRef(*trace)); return kj::refcounted(kj::addRef(*this), kj::mv(trace), pipelineLogLevel); } @@ -125,7 +126,7 @@ void WorkerTracer::log(kj::Date timestamp, LogLevel logLevel, kj::String message trace->addLog(trace::Log(timestamp, logLevel, kj::mv(message)), isSpan); } -void WorkerTracer::addSpan(const trace::Span& span, kj::String spanContext) { +void WorkerTracer::addSpan(const Span& span, kj::String spanContext) { // TODO(someday): For now, we're using logLevel == none as a hint to avoid doing anything // expensive while tracing. We may eventually want separate configuration for exceptions vs. // logs. @@ -169,6 +170,10 @@ void WorkerTracer::setEventInfo(kj::Date timestamp, trace::EventInfo&& info) { trace->setEventInfo(timestamp, kj::mv(info)); } +void WorkerTracer::addMetrics(trace::Metrics&& metrics) { + trace->addMetrics(kj::mv(metrics)); +} + void WorkerTracer::setOutcomeInfo(trace::Outcome&& info) { trace->setOutcome(kj::mv(info)); } diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index 16c10411fdd..66a273c613e 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -30,8 +30,6 @@ namespace workerd { using kj::byte; using kj::uint; -using Span = trace::Span; - // ======================================================================================= class WorkerTracer; @@ -97,6 +95,9 @@ class WorkerTracer final: public kj::Refcounted { // Sets info about the event that triggered the trace. Must not be called more than once. void setEventInfo(kj::Date timestamp, trace::EventInfo&&); + // Add metrics to the trace. Can be called more than once. + void addMetrics(trace::Metrics&& metrics); + // Sets info about the result of this trace. Can be called more than once, overriding the // previous detail. void setOutcomeInfo(trace::Outcome&& info); diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index a0c21edbbce..7b5a390d838 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -32,6 +32,27 @@ struct Trace @0x8e8d911203762d34 { message @2 :Text; } + # Streaming tail workers support an expanded version of Log that supports arbitrary + # v8 serialized data or a text message. We define this as a separate new + # struct in order to avoid any possible non-backwards compatible disruption to anything + # using the existing Log struct in the original trace worker impl. The two structs are + # virtually identical with the exception that the message field can be v8 serialized data. + struct LogV2 { + timestampNs @0 :Int64; + logLevel @1 :Log.Level; + message :union { + # When data is used, the LogV2 message is expected to be a v8 serialized value. + data @2 :Data; + text @3 :Text; + } + tags @4 :List(Tag); + # A Log entry might be truncated if it exceeds the maximum size limit configured + # for the process. Truncation should occur before the data is serialized so it + # should always be possible to deserialize the data field successfully, regardless + # of the specific format of the data. + truncated @5 :Bool; + } + exceptions @1 :List(Exception); struct Exception { timestampNs @0 :Int64; @@ -184,11 +205,9 @@ struct Trace @0x8e8d911203762d34 { # the entire trace steam. struct Outcome { outcome @0 :EventOutcome; - cpuTime @1 :UInt64; - wallTime @2 :UInt64; # Any additional arbitrary metadata that should be associated with the outcome. - tags @3 :List(Tag); + tags @1 :List(Tag); } # An outcome information object that describes additional detail about the outcome @@ -201,23 +220,10 @@ struct Trace @0x8e8d911203762d34 { tags @0 :List(Tag); } - # Streaming tail workers support an expanded version of Log that supports arbitrary - # v8 serialized data rather than a text message. We define this as a separate new - # struct in order to avoid any possible non-backwards compatible disruption to anything - # using the existing Log struct in the original trace worker impl. The two structs are - # virtually identical with the exception that the message field here will always be - # v8 serialized data. - struct LogV2 { - timestampNs @0 :Int64; - logLevel @1 :Log.Level; - data @2 :Data; - tags @3 :List(Tag); - } - # A detail event indicating a subrequest that was made during a request. This # can be a fetch subrequest, an RPC subrequest, a call out to a KV namespace, # etc. - # TODO(now): This needs to be flushed out more. + # TODO(streaming-trace): This needs to be flushed out more. struct Subrequest { # A monotonic sequence number that is unique within the tail. The id is # used primarily to correlate with the SubrequestOutcome. diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index fd21a0ae10a..af337963dfb 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1456,7 +1456,7 @@ public: if (fetchStatus != 0) { t->setFetchResponseInfo(trace::FetchResponseInfo(fetchStatus)); } - t->setOutcomeInfo(trace::Outcome(outcome, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS)); + t->setOutcomeInfo(trace::Outcome(outcome)); } } @@ -1571,7 +1571,13 @@ public: } kj::Promise customEvent(kj::Own event) override { - return KJ_ASSERT_NONNULL(inner).customEvent(kj::mv(event)); + try { + co_return co_await KJ_ASSERT_NONNULL(inner).customEvent(kj::mv(event)); + } catch (...) { + auto exception = kj::getCaughtExceptionAsKj(); + reportFailure(exception, FailureSource::OTHER); + throw exception; + } } private: @@ -1692,15 +1698,15 @@ public: tailWorkers.add(service.startRequest({})); } waitUntilTasks.add(childTracer->onComplete().then( - [this, tailWorkers = kj::mv(tailWorkers)]( - kj::Array> traces) mutable -> kj::Promise { + kj::coCapture([this, tailWorkers = kj::mv(tailWorkers)]( + kj::Array> traces) mutable -> kj::Promise { for (auto& worker: tailWorkers) { auto event = kj::heap( workerd::api::TraceCustomEventImpl::TYPE, waitUntilTasks, mapAddRef(traces)); - co_return co_await worker->customEvent(kj::mv(event)).ignoreResult(); + co_await worker->customEvent(kj::mv(event)).ignoreResult(); } co_return; - })); + }))); }; return newWorkerEntrypoint(threadContext, kj::atomicAddRef(*worker), entrypointName, kj::mv(actor), kj::Own(this, kj::NullDisposer::instance),