From daf2d0b4cd60e4be16e1833839b958c272244d99 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Fri, 8 Dec 2023 11:02:29 -0800 Subject: [PATCH] More perfetto instrumentation and fixes --- src/workerd/io/worker-entrypoint.c++ | 46 ++++++++++++++++++++++++--- src/workerd/util/perfetto-tracing.c++ | 6 ++-- src/workerd/util/perfetto-tracing.h | 8 +++++ 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index ef9063026fa..73df5e2de1d 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -119,11 +119,14 @@ public: kj::Own send( uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers, kj::Maybe expectedBodySize = kj::none) override { + TRACE_EVENT("workerd", "WorkerEntrypoint::ResponseSentTracker::send()", + "statusCode", statusCode); sent = true; return inner.send(statusCode, statusText, headers, expectedBodySize); } kj::Own acceptWebSocket(const kj::HttpHeaders& headers) override { + TRACE_EVENT("workerd", "WorkerEntrypoint::ResponseSentTracker::acceptWebSocket()"); sent = true; return inner.acceptWebSocket(headers); } @@ -146,6 +149,7 @@ kj::Own WorkerEntrypoint::construct( bool tunnelExceptions, kj::Maybe> workerTracer, kj::Maybe cfBlobJson) { + TRACE_EVENT("workerd", "WorkerEntrypoint::construct()"); auto obj = kj::heap(kj::Badge(), threadContext, waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(cfBlobJson)); obj->init(kj::mv(worker), kj::mv(actor), kj::mv(limitEnforcer), @@ -180,6 +184,7 @@ void WorkerEntrypoint::init( // IoContext, in which case we reuse it. auto newContext = [&]() { + TRACE_EVENT("workerd", "WorkerEntrypoint::init() create new IoContext"); auto actorRef = actor.map([](kj::Own& ptr) -> Worker::Actor& { return *ptr; }); @@ -210,7 +215,8 @@ void WorkerEntrypoint::init( kj::Promise WorkerEntrypoint::request( kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, kj::AsyncInputStream& requestBody, Response& response) { - TRACE_EVENT("workerd", "WorkerEntrypoint::request()"); + TRACE_EVENT("workerd", "WorkerEntrypoint::request()", "url", url.cStr(), + PERFETTO_FLOW_FROM_POINTER(this)); auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "request() can only be called once")); this->incomingRequest = kj::none; @@ -252,21 +258,31 @@ kj::Promise WorkerEntrypoint::request( auto metricsForCatch = kj::addRef(incomingRequest->getMetrics()); + TRACE_EVENT_BEGIN("workerd", "WorkerEntrypoint::request() waiting on context", + PERFETTO_TRACK_FROM_POINTER(&context), + PERFETTO_FLOW_FROM_POINTER(this)); + return context.run( [this, &context, method, url, &headers, &requestBody, &metrics = incomingRequest->getMetrics(), &wrappedResponse = *wrappedResponse, entrypointName = entrypointName] (Worker::Lock& lock) mutable { - TRACE_EVENT("workerd", "WorkerEntrypoint::request() main"); + TRACE_EVENT_END("workerd", PERFETTO_TRACK_FROM_POINTER(&context)); + TRACE_EVENT("workerd", "WorkerEntrypoint::request() run", + PERFETTO_FLOW_FROM_POINTER(this)); jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); return lock.getGlobalScope().request( method, url, headers, requestBody, wrappedResponse, cfBlobJson, lock, lock.getExportedHandler(entrypointName, context.getActor())); }).then([this](api::DeferredProxy deferredProxy) { + TRACE_EVENT("workerd", "WorkerEntrypoint::request() deferred proxy step", + PERFETTO_FLOW_FROM_POINTER(this)); proxyTask = kj::mv(deferredProxy.proxyTask); }).exclusiveJoin(context.onAbort()) .catch_([this,&context](kj::Exception&& exception) mutable -> kj::Promise { + TRACE_EVENT("workerd", "WorkerEntrypoint::request() catch", + PERFETTO_FLOW_FROM_POINTER(this)); // Log JS exceptions to the JS console, if fiddle is attached. This also has the effect of // logging internal errors to syslog. loggedExceptionEarlier = true; @@ -276,8 +292,12 @@ kj::Promise WorkerEntrypoint::request( // Do not allow the exception to escape the isolate without waiting for the output gate to // open. Note that in the success path, this is taken care of in `FetchEvent::respondWith()`. return context.waitForOutputLocks() - .then([exception = kj::mv(exception)]() mutable - -> kj::Promise { return kj::mv(exception); }); + .then([exception = kj::mv(exception), + flow=PERFETTO_TERMINATING_FLOW_FROM_POINTER(this)]() mutable + -> kj::Promise { + TRACE_EVENT("workerd", "WorkerEntrypoint::request() after output lock wait", flow); + return kj::mv(exception); + }); }).attach(kj::defer([this,incomingRequest = kj::mv(incomingRequest),&context]() mutable { // The request has been canceled, but allow it to continue executing in the background. if (context.isFailOpen()) { @@ -289,6 +309,8 @@ kj::Promise WorkerEntrypoint::request( auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); })).then([this]() -> kj::Promise { + TRACE_EVENT("workerd", "WorkerEntrypoint::request() finish proxying", + PERFETTO_TERMINATING_FLOW_FROM_POINTER(this)); // Now that the IoContext is dropped (unless it had waitUntil()s), we can finish proxying // without pinning it or the isolate into memory. KJ_IF_SOME(p, proxyTask) { @@ -303,6 +325,8 @@ kj::Promise WorkerEntrypoint::request( method, url, &headers, &requestBody, metrics = kj::mv(metricsForCatch)] (kj::Exception&& exception) mutable -> kj::Promise { // Don't return errors to end user. + TRACE_EVENT("workerd", "WorkerEntrypoint::request() exception", + PERFETTO_TERMINATING_FLOW_FROM_POINTER(this)); auto isInternalException = !jsg::isTunneledException(exception.getDescription()) && !jsg::isDoNotLogException(exception.getDescription()); @@ -418,7 +442,7 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, const kj::HttpHe void WorkerEntrypoint::prewarm(kj::StringPtr url) { // Nothing to do, the worker is already loaded. - + TRACE_EVENT("workerd", "WorkerEntrypoint::prewarm()", "url", url.cStr()); auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "prewarm() can only be called once")); incomingRequest->getMetrics().setIsPrewarm(); @@ -433,6 +457,7 @@ void WorkerEntrypoint::prewarm(kj::StringPtr url) { kj::Promise WorkerEntrypoint::runScheduled( kj::Date scheduledTime, kj::StringPtr cron) { + TRACE_EVENT("workerd", "WorkerEntrypoint::runScheduled()"); auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "runScheduled() can only be called once")); this->incomingRequest = kj::none; @@ -454,6 +479,7 @@ kj::Promise WorkerEntrypoint::runScheduled( [scheduledTime, cron, entrypointName=entrypointName, &context, &metrics = incomingRequest->getMetrics()] (Worker::Lock& lock) mutable { + TRACE_EVENT("workerd", "WorkerEntrypoint::runScheduled() run"); jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); lock.getGlobalScope().startScheduled(scheduledTime, cron, lock, @@ -463,6 +489,7 @@ kj::Promise WorkerEntrypoint::runScheduled( static auto constexpr waitForFinished = [](IoContext& context, kj::Own request) -> kj::Promise { + TRACE_EVENT("workerd", "WorkerEntrypoint::runScheduled() waitForFinished()"); bool completed = co_await request->finishScheduled(); co_return WorkerInterface::ScheduledResult { .retry = context.shouldRetryScheduled(), @@ -484,6 +511,8 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( // scheduled time) arrives while we still have one running and one scheduled, we discard the // previous scheduled alarm. + TRACE_EVENT("workerd", "WorkerEntrypoint::runAlarmImpl()"); + auto& context = incomingRequest->getContext(); auto& actor = KJ_REQUIRE_NONNULL(context.getActor(), "alarm() should only work with actors"); @@ -550,6 +579,7 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( kj::Promise WorkerEntrypoint::runAlarm( kj::Date scheduledTime) { + TRACE_EVENT("workerd", "WorkerEntrypoint::runAlarm()"); auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "runAlarm() can only be called once")); this->incomingRequest = kj::none; @@ -560,6 +590,7 @@ kj::Promise WorkerEntrypoint::runAlarm( } kj::Promise WorkerEntrypoint::test() { + TRACE_EVENT("workerd", "WorkerEntrypoint::test()"); auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "test() can only be called once")); this->incomingRequest = kj::none; @@ -570,6 +601,7 @@ kj::Promise WorkerEntrypoint::test() { context.addWaitUntil(context.run( [entrypointName=entrypointName, &context, &metrics = incomingRequest->getMetrics()] (Worker::Lock& lock) mutable -> kj::Promise { + TRACE_EVENT("workerd", "WorkerEntrypoint::test() run"); jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); return context.awaitJs(lock, lock.getGlobalScope() @@ -579,6 +611,7 @@ kj::Promise WorkerEntrypoint::test() { static auto constexpr waitForFinished = [](IoContext& context, kj::Own request) -> kj::Promise { + TRACE_EVENT("workerd", "WorkerEntrypoint::test() waitForFinished()"); bool completed = co_await request->finishScheduled(); auto outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU; co_return outcome == EventOutcome::OK; @@ -589,6 +622,7 @@ kj::Promise WorkerEntrypoint::test() { kj::Promise WorkerEntrypoint::customEvent(kj::Own event) { + TRACE_EVENT("workerd", "WorkerEntrypoint::customEvent()", "type", event->getType()); auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "customEvent() can only be called once")); this->incomingRequest = kj::none; @@ -606,6 +640,7 @@ kj::Promise #ifdef KJ_DEBUG void requestGc(const Worker& worker) { + TRACE_EVENT("workerd", "Debug: requestGc()"); jsg::V8StackScope stackScope; auto lock = worker.getIsolate().getApi().lock(stackScope); lock->requestGcForTesting(); @@ -613,6 +648,7 @@ void requestGc(const Worker& worker) { template kj::Promise addGcPassForTest(IoContext& context, kj::Promise promise) { + TRACE_EVENT("workerd", "Debug: addGcPassForTest"); auto worker = kj::atomicAddRef(context.getWorker()); if constexpr (kj::isSameType()) { co_await promise; diff --git a/src/workerd/util/perfetto-tracing.c++ b/src/workerd/util/perfetto-tracing.c++ index 8b344c3787d..deeae4b7598 100644 --- a/src/workerd/util/perfetto-tracing.c++ +++ b/src/workerd/util/perfetto-tracing.c++ @@ -81,8 +81,8 @@ struct PerfettoSession::Impl { kj::AutoCloseFd fd; std::unique_ptr session; - Impl(int fd, kj::StringPtr categories) - : fd(fd), session(createTracingSession(fd, categories)) { + Impl(kj::AutoCloseFd dest, kj::StringPtr categories) + : fd(kj::mv(dest)), session(createTracingSession(fd.get(), categories)) { session->StartBlocking(); } }; @@ -91,7 +91,7 @@ PerfettoSession::PerfettoSession(kj::StringPtr path, kj::StringPtr categories) : impl(kj::heap(openTraceFile(path), categories)) {} PerfettoSession::PerfettoSession(int fd, kj::StringPtr categories) - : impl(kj::heap(fd, categories)) {} + : impl(kj::heap(kj::AutoCloseFd(fd), categories)) {} PerfettoSession::~PerfettoSession() noexcept(false) { if (impl) { diff --git a/src/workerd/util/perfetto-tracing.h b/src/workerd/util/perfetto-tracing.h index a7796db31c9..178366b4349 100644 --- a/src/workerd/util/perfetto-tracing.h +++ b/src/workerd/util/perfetto-tracing.h @@ -42,10 +42,15 @@ class PerfettoSession { friend constexpr bool _kj_internal_isPolymorphic(PerfettoSession::Impl*); }; +#define PERFETTO_FLOW_FROM_POINTER(ptr) perfetto::Flow::FromPointer(ptr) +#define PERFETTO_TERMINATING_FLOW_FROM_POINTER(ptr) perfetto::TerminatingFlow::FromPointer(ptr) +#define PERFETTO_TRACK_FROM_POINTER(ptr) perfetto::Track::FromPointer(ptr) + KJ_DECLARE_NON_POLYMORPHIC(PerfettoSession::Impl); } // workerd #else // defined(WORKERD_USE_PERFETTO) +struct PerfettoNoop {}; // We define non-op versions of the instrumentation macros here so that we can // still instrument and build when perfetto is not enabled. #define TRACE_EVENT(...) @@ -54,4 +59,7 @@ KJ_DECLARE_NON_POLYMORPHIC(PerfettoSession::Impl); #define TRACE_EVENT_INSTANT(...) #define TRACE_COUNTER(...) #define TRACE_EVENT_CATEGORY_ENABLED(...) false +#define PERFETTO_FLOW_FROM_POINTER(ptr) PerfettoNoop {} +#define PERFETTO_TERMINATING_FLOW_FROM_POINTER(ptr) PerfettoNoop {} +#define PERFETTO_TRACK_FROM_POINTER(ptr) PerfettoNoop {} #endif // defined(WORKERD_USE_PERFETTO)