Skip to content

Commit

Permalink
Merge pull request #1486 from cloudflare/jsnell/perfetto-instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell authored Dec 8, 2023
2 parents cb93462 + daf2d0b commit 216838e
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 13 deletions.
46 changes: 41 additions & 5 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,14 @@ public:
kj::Own<kj::AsyncOutputStream> send(
uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = kj::none) override {
TRACE_EVENT("workerd", "WorkerEntrypoint::ResponseSentTracker::send()",
"statusCode", statusCode);
sent = true;
return inner.send(statusCode, statusText, headers, expectedBodySize);
}

kj::Own<kj::WebSocket> acceptWebSocket(const kj::HttpHeaders& headers) override {
TRACE_EVENT("workerd", "WorkerEntrypoint::ResponseSentTracker::acceptWebSocket()");
sent = true;
return inner.acceptWebSocket(headers);
}
Expand All @@ -146,6 +149,7 @@ kj::Own<WorkerInterface> WorkerEntrypoint::construct(
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson) {
TRACE_EVENT("workerd", "WorkerEntrypoint::construct()");
auto obj = kj::heap<WorkerEntrypoint>(kj::Badge<WorkerEntrypoint>(), threadContext,
waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(cfBlobJson));
obj->init(kj::mv(worker), kj::mv(actor), kj::mv(limitEnforcer),
Expand Down Expand Up @@ -180,6 +184,7 @@ void WorkerEntrypoint::init(
// IoContext, in which case we reuse it.

auto newContext = [&]() {
TRACE_EVENT("workerd", "WorkerEntrypoint::init() create new IoContext");
auto actorRef = actor.map([](kj::Own<Worker::Actor>& ptr) -> Worker::Actor& {
return *ptr;
});
Expand Down Expand Up @@ -210,7 +215,8 @@ void WorkerEntrypoint::init(
kj::Promise<void> WorkerEntrypoint::request(
kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) {
TRACE_EVENT("workerd", "WorkerEntrypoint::request()");
TRACE_EVENT("workerd", "WorkerEntrypoint::request()", "url", url.cStr(),
PERFETTO_FLOW_FROM_POINTER(this));
auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest,
"request() can only be called once"));
this->incomingRequest = kj::none;
Expand Down Expand Up @@ -252,21 +258,31 @@ kj::Promise<void> WorkerEntrypoint::request(

auto metricsForCatch = kj::addRef(incomingRequest->getMetrics());

TRACE_EVENT_BEGIN("workerd", "WorkerEntrypoint::request() waiting on context",
PERFETTO_TRACK_FROM_POINTER(&context),
PERFETTO_FLOW_FROM_POINTER(this));

return context.run(
[this, &context, method, url, &headers, &requestBody,
&metrics = incomingRequest->getMetrics(),
&wrappedResponse = *wrappedResponse, entrypointName = entrypointName]
(Worker::Lock& lock) mutable {
TRACE_EVENT("workerd", "WorkerEntrypoint::request() main");
TRACE_EVENT_END("workerd", PERFETTO_TRACK_FROM_POINTER(&context));
TRACE_EVENT("workerd", "WorkerEntrypoint::request() run",
PERFETTO_FLOW_FROM_POINTER(this));
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

return lock.getGlobalScope().request(
method, url, headers, requestBody, wrappedResponse,
cfBlobJson, lock, lock.getExportedHandler(entrypointName, context.getActor()));
}).then([this](api::DeferredProxy<void> deferredProxy) {
TRACE_EVENT("workerd", "WorkerEntrypoint::request() deferred proxy step",
PERFETTO_FLOW_FROM_POINTER(this));
proxyTask = kj::mv(deferredProxy.proxyTask);
}).exclusiveJoin(context.onAbort())
.catch_([this,&context](kj::Exception&& exception) mutable -> kj::Promise<void> {
TRACE_EVENT("workerd", "WorkerEntrypoint::request() catch",
PERFETTO_FLOW_FROM_POINTER(this));
// Log JS exceptions to the JS console, if fiddle is attached. This also has the effect of
// logging internal errors to syslog.
loggedExceptionEarlier = true;
Expand All @@ -276,8 +292,12 @@ kj::Promise<void> WorkerEntrypoint::request(
// Do not allow the exception to escape the isolate without waiting for the output gate to
// open. Note that in the success path, this is taken care of in `FetchEvent::respondWith()`.
return context.waitForOutputLocks()
.then([exception = kj::mv(exception)]() mutable
-> kj::Promise<void> { return kj::mv(exception); });
.then([exception = kj::mv(exception),
flow=PERFETTO_TERMINATING_FLOW_FROM_POINTER(this)]() mutable
-> kj::Promise<void> {
TRACE_EVENT("workerd", "WorkerEntrypoint::request() after output lock wait", flow);
return kj::mv(exception);
});
}).attach(kj::defer([this,incomingRequest = kj::mv(incomingRequest),&context]() mutable {
// The request has been canceled, but allow it to continue executing in the background.
if (context.isFailOpen()) {
Expand All @@ -289,6 +309,8 @@ kj::Promise<void> WorkerEntrypoint::request(
auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest));
waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise)));
})).then([this]() -> kj::Promise<void> {
TRACE_EVENT("workerd", "WorkerEntrypoint::request() finish proxying",
PERFETTO_TERMINATING_FLOW_FROM_POINTER(this));
// Now that the IoContext is dropped (unless it had waitUntil()s), we can finish proxying
// without pinning it or the isolate into memory.
KJ_IF_SOME(p, proxyTask) {
Expand All @@ -303,6 +325,8 @@ kj::Promise<void> WorkerEntrypoint::request(
method, url, &headers, &requestBody, metrics = kj::mv(metricsForCatch)]
(kj::Exception&& exception) mutable -> kj::Promise<void> {
// Don't return errors to end user.
TRACE_EVENT("workerd", "WorkerEntrypoint::request() exception",
PERFETTO_TERMINATING_FLOW_FROM_POINTER(this));

auto isInternalException = !jsg::isTunneledException(exception.getDescription())
&& !jsg::isDoNotLogException(exception.getDescription());
Expand Down Expand Up @@ -418,7 +442,7 @@ kj::Promise<void> WorkerEntrypoint::connect(kj::StringPtr host, const kj::HttpHe

void WorkerEntrypoint::prewarm(kj::StringPtr url) {
// Nothing to do, the worker is already loaded.

TRACE_EVENT("workerd", "WorkerEntrypoint::prewarm()", "url", url.cStr());
auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest,
"prewarm() can only be called once"));
incomingRequest->getMetrics().setIsPrewarm();
Expand All @@ -433,6 +457,7 @@ void WorkerEntrypoint::prewarm(kj::StringPtr url) {
kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
kj::Date scheduledTime,
kj::StringPtr cron) {
TRACE_EVENT("workerd", "WorkerEntrypoint::runScheduled()");
auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest,
"runScheduled() can only be called once"));
this->incomingRequest = kj::none;
Expand All @@ -454,6 +479,7 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
[scheduledTime, cron, entrypointName=entrypointName, &context,
&metrics = incomingRequest->getMetrics()]
(Worker::Lock& lock) mutable {
TRACE_EVENT("workerd", "WorkerEntrypoint::runScheduled() run");
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

lock.getGlobalScope().startScheduled(scheduledTime, cron, lock,
Expand All @@ -463,6 +489,7 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
static auto constexpr waitForFinished = [](IoContext& context,
kj::Own<IoContext::IncomingRequest> request)
-> kj::Promise<WorkerInterface::ScheduledResult> {
TRACE_EVENT("workerd", "WorkerEntrypoint::runScheduled() waitForFinished()");
bool completed = co_await request->finishScheduled();
co_return WorkerInterface::ScheduledResult {
.retry = context.shouldRetryScheduled(),
Expand All @@ -484,6 +511,8 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
// scheduled time) arrives while we still have one running and one scheduled, we discard the
// previous scheduled alarm.

TRACE_EVENT("workerd", "WorkerEntrypoint::runAlarmImpl()");

auto& context = incomingRequest->getContext();
auto& actor = KJ_REQUIRE_NONNULL(context.getActor(), "alarm() should only work with actors");

Expand Down Expand Up @@ -550,6 +579,7 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(

kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarm(
kj::Date scheduledTime) {
TRACE_EVENT("workerd", "WorkerEntrypoint::runAlarm()");
auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest,
"runAlarm() can only be called once"));
this->incomingRequest = kj::none;
Expand All @@ -560,6 +590,7 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarm(
}

kj::Promise<bool> WorkerEntrypoint::test() {
TRACE_EVENT("workerd", "WorkerEntrypoint::test()");
auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest,
"test() can only be called once"));
this->incomingRequest = kj::none;
Expand All @@ -570,6 +601,7 @@ kj::Promise<bool> WorkerEntrypoint::test() {
context.addWaitUntil(context.run(
[entrypointName=entrypointName, &context, &metrics = incomingRequest->getMetrics()]
(Worker::Lock& lock) mutable -> kj::Promise<void> {
TRACE_EVENT("workerd", "WorkerEntrypoint::test() run");
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

return context.awaitJs(lock, lock.getGlobalScope()
Expand All @@ -579,6 +611,7 @@ kj::Promise<bool> WorkerEntrypoint::test() {
static auto constexpr waitForFinished = [](IoContext& context,
kj::Own<IoContext::IncomingRequest> request)
-> kj::Promise<bool> {
TRACE_EVENT("workerd", "WorkerEntrypoint::test() waitForFinished()");
bool completed = co_await request->finishScheduled();
auto outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU;
co_return outcome == EventOutcome::OK;
Expand All @@ -589,6 +622,7 @@ kj::Promise<bool> WorkerEntrypoint::test() {

kj::Promise<WorkerInterface::CustomEvent::Result>
WorkerEntrypoint::customEvent(kj::Own<CustomEvent> event) {
TRACE_EVENT("workerd", "WorkerEntrypoint::customEvent()", "type", event->getType());
auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest,
"customEvent() can only be called once"));
this->incomingRequest = kj::none;
Expand All @@ -606,13 +640,15 @@ kj::Promise<WorkerInterface::CustomEvent::Result>

#ifdef KJ_DEBUG
void requestGc(const Worker& worker) {
TRACE_EVENT("workerd", "Debug: requestGc()");
jsg::V8StackScope stackScope;
auto lock = worker.getIsolate().getApi().lock(stackScope);
lock->requestGcForTesting();
}

template <typename T>
kj::Promise<T> addGcPassForTest(IoContext& context, kj::Promise<T> promise) {
TRACE_EVENT("workerd", "Debug: addGcPassForTest");
auto worker = kj::atomicAddRef(context.getWorker());
if constexpr (kj::isSameType<T, void>()) {
co_await promise;
Expand Down
10 changes: 5 additions & 5 deletions src/workerd/server/workerd-api.c++
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ Worker::Script::Source WorkerdApi::extractSource(kj::StringPtr name,
config::Worker::Reader conf,
Worker::ValidationErrorReporter& errorReporter,
capnp::List<config::Extension>::Reader extensions) {
TRACE_EVENT("workerd", "WorkerdApiIsolate::extractSource()");
TRACE_EVENT("workerd", "WorkerdApi::extractSource()");
switch (conf.which()) {
case config::Worker::MODULES: {
auto modules = conf.getModules();
Expand Down Expand Up @@ -243,7 +243,7 @@ kj::Array<Worker::Script::CompiledGlobal> WorkerdApi::compileScriptGlobals(
jsg::Lock& lockParam, config::Worker::Reader conf,
Worker::ValidationErrorReporter& errorReporter,
const jsg::CompilationObserver& observer) const {
TRACE_EVENT("workerd", "WorkerdApiIsolate::compileScriptGlobals()");
TRACE_EVENT("workerd", "WorkerdApi::compileScriptGlobals()");
// For Service Worker scripts, we support Wasm modules as globals, but they need to be loaded
// at script load time.

Expand Down Expand Up @@ -274,14 +274,14 @@ void WorkerdApi::compileModules(
jsg::Lock& lockParam, config::Worker::Reader conf,
Worker::ValidationErrorReporter& errorReporter,
capnp::List<config::Extension>::Reader extensions) const {
TRACE_EVENT("workerd", "WorkerdApiIsolate::compileModules()");
TRACE_EVENT("workerd", "WorkerdApi::compileModules()");
auto& lock = kj::downcast<JsgWorkerdIsolate::Lock>(lockParam);
lockParam.withinHandleScope([&] {
auto modules = jsg::ModuleRegistryImpl<JsgWorkerdIsolate_TypeWrapper>::from(lockParam);

for (auto module: conf.getModules()) {
auto path = kj::Path::parse(module.getName());
TRACE_EVENT("workerd", "WorkerdApiIsolate::compileModules() module",
TRACE_EVENT("workerd", "WorkerdApi::compileModules() module",
"path", path.toString(true).cStr());

switch (module.which()) {
Expand Down Expand Up @@ -642,7 +642,7 @@ void WorkerdApi::compileGlobals(
jsg::Lock& lockParam, kj::ArrayPtr<const Global> globals,
v8::Local<v8::Object> target,
uint32_t ownerId) const {
TRACE_EVENT("workerd", "WorkerdApiIsolate::compileGlobals()");
TRACE_EVENT("workerd", "WorkerdApi::compileGlobals()");
auto& lock = kj::downcast<JsgWorkerdIsolate::Lock>(lockParam);
lockParam.withinHandleScope([&] {
auto& featureFlags = *impl->features;
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/util/perfetto-tracing.c++
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ struct PerfettoSession::Impl {
kj::AutoCloseFd fd;
std::unique_ptr<perfetto::TracingSession> session;

Impl(int fd, kj::StringPtr categories)
: fd(fd), session(createTracingSession(fd, categories)) {
Impl(kj::AutoCloseFd dest, kj::StringPtr categories)
: fd(kj::mv(dest)), session(createTracingSession(fd.get(), categories)) {
session->StartBlocking();
}
};
Expand All @@ -91,7 +91,7 @@ PerfettoSession::PerfettoSession(kj::StringPtr path, kj::StringPtr categories)
: impl(kj::heap<Impl>(openTraceFile(path), categories)) {}

PerfettoSession::PerfettoSession(int fd, kj::StringPtr categories)
: impl(kj::heap<Impl>(fd, categories)) {}
: impl(kj::heap<Impl>(kj::AutoCloseFd(fd), categories)) {}

PerfettoSession::~PerfettoSession() noexcept(false) {
if (impl) {
Expand Down
8 changes: 8 additions & 0 deletions src/workerd/util/perfetto-tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,15 @@ class PerfettoSession {
friend constexpr bool _kj_internal_isPolymorphic(PerfettoSession::Impl*);
};

#define PERFETTO_FLOW_FROM_POINTER(ptr) perfetto::Flow::FromPointer(ptr)
#define PERFETTO_TERMINATING_FLOW_FROM_POINTER(ptr) perfetto::TerminatingFlow::FromPointer(ptr)
#define PERFETTO_TRACK_FROM_POINTER(ptr) perfetto::Track::FromPointer(ptr)

KJ_DECLARE_NON_POLYMORPHIC(PerfettoSession::Impl);
} // workerd

#else // defined(WORKERD_USE_PERFETTO)
struct PerfettoNoop {};
// We define non-op versions of the instrumentation macros here so that we can
// still instrument and build when perfetto is not enabled.
#define TRACE_EVENT(...)
Expand All @@ -54,4 +59,7 @@ KJ_DECLARE_NON_POLYMORPHIC(PerfettoSession::Impl);
#define TRACE_EVENT_INSTANT(...)
#define TRACE_COUNTER(...)
#define TRACE_EVENT_CATEGORY_ENABLED(...) false
#define PERFETTO_FLOW_FROM_POINTER(ptr) PerfettoNoop {}
#define PERFETTO_TERMINATING_FLOW_FROM_POINTER(ptr) PerfettoNoop {}
#define PERFETTO_TRACK_FROM_POINTER(ptr) PerfettoNoop {}
#endif // defined(WORKERD_USE_PERFETTO)

0 comments on commit 216838e

Please sign in to comment.