diff --git a/samples/tail-worker/config.capnp b/samples/tail-worker/config.capnp new file mode 100644 index 00000000000..b981e8024fd --- /dev/null +++ b/samples/tail-worker/config.capnp @@ -0,0 +1,25 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const tailWorkerExample :Workerd.Config = ( + services = [ + (name = "main", worker = .helloWorld), + (name = "log", worker = .logWorker), + ], + + sockets = [ ( name = "http", address = "*:8080", http = (), service = "main" ) ] +); + +const helloWorld :Workerd.Worker = ( + modules = [ + (name = "worker", esModule = embed "worker.js") + ], + compatibilityDate = "2023-02-28", + logging = ( toService = "log" ), +); + +const logWorker :Workerd.Worker = ( + modules = [ + (name = "worker", esModule = embed "tail.js") + ], + compatibilityDate = "2023-02-28", +); diff --git a/samples/tail-worker/tail.js b/samples/tail-worker/tail.js new file mode 100644 index 00000000000..bfdf7154d5f --- /dev/null +++ b/samples/tail-worker/tail.js @@ -0,0 +1,9 @@ +// Copyright (c) 2017-2023 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +export default { + tail(traces) { + console.log(traces); + } +}; diff --git a/samples/tail-worker/worker.js b/samples/tail-worker/worker.js new file mode 100644 index 00000000000..34041aa64b9 --- /dev/null +++ b/samples/tail-worker/worker.js @@ -0,0 +1,9 @@ +// Copyright (c) 2017-2023 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +export default { + async fetch(req, env) { + return new Response("Hello World\n"); + } +}; diff --git a/src/workerd/api/trace.h b/src/workerd/api/trace.h index 04056f44a28..6770a836f2d 100644 --- a/src/workerd/api/trace.h +++ b/src/workerd/api/trace.h @@ -622,6 +622,8 @@ class TraceCustomEventImpl final: public WorkerInterface::CustomEvent { return typeId; } + static constexpr uint16_t TYPE = 2; + private: uint16_t typeId; kj::Array> traces; diff --git a/src/workerd/io/trace-streaming.h b/src/workerd/io/trace-streaming.h index 719f227754e..78dc085c481 100644 --- a/src/workerd/io/trace-streaming.h +++ b/src/workerd/io/trace-streaming.h @@ -147,7 +147,7 @@ class StreamingTrace final { // to be used directly. Use the static create(...) method instead. explicit StreamingTrace(kj::Own id, trace::Onset&& onset, - Delegate delegatem, + Delegate delegate, const TimeProvider& timeProvider); ~StreamingTrace() noexcept(false); KJ_DISALLOW_COPY_AND_MOVE(StreamingTrace); diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index b0f05243b57..e8ea95d6f42 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -1441,6 +1442,141 @@ void Server::InspectorServiceIsolateRegistrar::registerIsolate( // ======================================================================================= +namespace { +class RequestObserverWithTracer final: public RequestObserver, public WorkerInterface { +public: + RequestObserverWithTracer(kj::Maybe> tracer): tracer(kj::mv(tracer)) {} + ~RequestObserverWithTracer() noexcept(false) { + KJ_IF_SOME(t, tracer) { + if (fetchStatus != 0) { + t->setFetchResponseInfo(trace::FetchResponseInfo(fetchStatus)); + } + t->setOutcomeInfo(trace::Outcome(outcome, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS)); + } + } + + WorkerInterface& wrapWorkerInterface(WorkerInterface& worker) override { + if (tracer != kj::none) { + inner = worker; + return *this; + } + return worker; + } + + void reportFailure(const kj::Exception& exception, FailureSource source) override { + outcome = EventOutcome::EXCEPTION; + } + + class ResponseObserver final: public kj::HttpService::Response { + public: + ResponseObserver(kj::HttpService::Response& response): inner(response) {} + + kj::Own send(uint statusCode, + kj::StringPtr statusText, + const kj::HttpHeaders& headers, + kj::Maybe expectedBodySize) override { + this->statusCode = statusCode; + return inner.send(statusCode, statusText, headers, expectedBodySize); + } + + kj::Own acceptWebSocket(const kj::HttpHeaders& headers) override { + return inner.acceptWebSocket(headers); + } + + uint getStatusCode() const { + return statusCode; + } + + private: + kj::HttpService::Response& inner; + uint statusCode; + }; + + // WorkerInterface + kj::Promise request(kj::HttpMethod method, + kj::StringPtr url, + const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, + kj::HttpService::Response& response) override { + try { + ResponseObserver responseWrapper(response); + co_await KJ_ASSERT_NONNULL(inner).request(method, url, headers, requestBody, responseWrapper); + fetchStatus = responseWrapper.getStatusCode(); + } catch (...) { + auto exception = kj::getCaughtExceptionAsKj(); + reportFailure(exception, FailureSource::OTHER); + fetchStatus = 500; + throw exception; + } + } + + kj::Promise connect(kj::StringPtr host, + const kj::HttpHeaders& headers, + kj::AsyncIoStream& connection, + ConnectResponse& response, + kj::HttpConnectSettings settings) override { + try { + co_return co_await KJ_ASSERT_NONNULL(inner).connect( + host, headers, connection, response, settings); + } catch (...) { + auto exception = kj::getCaughtExceptionAsKj(); + reportFailure(exception, FailureSource::OTHER); + throw exception; + } + } + + void prewarm(kj::StringPtr url) override { + try { + KJ_ASSERT_NONNULL(inner).prewarm(url); + } catch (...) { + auto exception = kj::getCaughtExceptionAsKj(); + reportFailure(exception, FailureSource::OTHER); + throw exception; + } + } + + kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { + try { + co_return co_await KJ_ASSERT_NONNULL(inner).runScheduled(scheduledTime, cron); + } catch (...) { + auto exception = kj::getCaughtExceptionAsKj(); + reportFailure(exception, FailureSource::OTHER); + throw exception; + } + } + + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override { + try { + co_return co_await KJ_ASSERT_NONNULL(inner).runAlarm(scheduledTime, retryCount); + } catch (...) { + auto exception = kj::getCaughtExceptionAsKj(); + reportFailure(exception, FailureSource::OTHER); + throw exception; + } + } + + kj::Promise test() override { + try { + co_return co_await KJ_ASSERT_NONNULL(inner).test(); + } catch (...) { + auto exception = kj::getCaughtExceptionAsKj(); + reportFailure(exception, FailureSource::OTHER); + throw exception; + } + } + + kj::Promise customEvent(kj::Own event) override { + return KJ_ASSERT_NONNULL(inner).customEvent(kj::mv(event)); + } + +private: + kj::Maybe> tracer; + kj::Maybe inner; + EventOutcome outcome = EventOutcome::OK; + int fetchStatus = 0; +}; +} // namespace + class Server::WorkerService final: public Service, private kj::TaskSet::ErrorHandler, private IoChannelFactory, @@ -1459,6 +1595,8 @@ public: }; using LinkCallback = kj::Function; using AbortActorsCallback = kj::Function; + using LookupServiceCallback = + kj::Function; WorkerService(ThreadContext& threadContext, kj::Own worker, @@ -1466,13 +1604,19 @@ public: kj::HashMap> namedEntrypointsParam, const kj::HashMap& actorClasses, LinkCallback linkCallback, - AbortActorsCallback abortActorsCallback) + AbortActorsCallback abortActorsCallback, + kj::Maybe> maybeTracer, + kj::Array> loggingServices, + LookupServiceCallback lookupService) : threadContext(threadContext), ioChannels(kj::mv(linkCallback)), worker(kj::mv(worker)), defaultEntrypointHandlers(kj::mv(defaultEntrypointHandlers)), waitUntilTasks(*this), - abortActorsCallback(kj::mv(abortActorsCallback)) { + abortActorsCallback(kj::mv(abortActorsCallback)), + maybeTracer(kj::mv(maybeTracer)), + loggingServices(kj::mv(loggingServices)), + lookupService(kj::mv(lookupService)) { namedEntrypoints.reserve(namedEntrypointsParam.size()); for (auto& ep: namedEntrypointsParam) { @@ -1530,15 +1674,38 @@ public: kj::Maybe entrypointName, kj::Maybe> actor = kj::none) { TRACE_EVENT("workerd", "Server::WorkerService::startRequest()"); + kj::Maybe> maybeWorkerTracer = kj::none; + KJ_IF_SOME(tracer, maybeTracer) { + auto childTracer = kj::refcounted(kj::addRef(*tracer)); + maybeWorkerTracer = childTracer->makeWorkerTracer(PipelineLogLevel::FULL, kj::none, kj::none, + kj::none, kj::none, kj::none, nullptr, kj::none); + + kj::Vector> tailWorkers; + for (auto& svc: loggingServices) { + auto& service = lookupService(*svc, "looking logging service"); + KJ_ASSERT(&service != this, "A worker currently cannot log to itself"); + tailWorkers.add(service.startRequest({})); + } + waitUntilTasks.add(childTracer->onComplete().then( + [this, tailWorkers = kj::mv(tailWorkers)]( + kj::Array> traces) mutable -> kj::Promise { + for (auto& worker: tailWorkers) { + auto event = kj::heap( + workerd::api::TraceCustomEventImpl::TYPE, waitUntilTasks, mapAddRef(traces)); + co_return co_await worker->customEvent(kj::mv(event)).ignoreResult(); + } + co_return; + })); + }; return newWorkerEntrypoint(threadContext, kj::atomicAddRef(*worker), entrypointName, kj::mv(actor), kj::Own(this, kj::NullDisposer::instance), {}, // ioContextDependency kj::Own(this, kj::NullDisposer::instance), - kj::refcounted(), // default observer makes no observations + kj::refcounted( + mapAddRef(maybeWorkerTracer)), // default observer makes no observations waitUntilTasks, - true, // tunnelExceptions - kj::none, // workerTracer - kj::mv(metadata.cfBlobJson)); + true, // tunnelExceptions + kj::mv(maybeWorkerTracer), kj::mv(metadata.cfBlobJson)); } class ActorNamespace final { @@ -2060,6 +2227,9 @@ private: kj::HashMap> actorNamespaces; kj::TaskSet waitUntilTasks; AbortActorsCallback abortActorsCallback; + kj::Maybe> maybeTracer; + kj::Array> loggingServices; + LookupServiceCallback lookupService; class ActorChannelImpl final: public IoChannelFactory::ActorChannel { public: @@ -3065,9 +3235,35 @@ kj::Own Server::makeWorker(kj::StringPtr name, return result; }; + kj::Vector> loggingServices; + auto maybeMakeTracer = [conf, &loggingServices]() -> kj::Maybe> { + auto logging = conf.getLogging(); + switch (logging.which()) { + case config::Worker::Logging::Which::NONE: + return kj::none; + case config::Worker::Logging::Which::TO_SERVICE: { + loggingServices.add(capnp::clone(logging.getToService())); + break; + } + case config::Worker::Logging::Which::TO_SERVICES: { + for (auto svc: logging.getToServices()) { + loggingServices.add(capnp::clone(svc)); + } + break; + } + } + + return kj::refcounted(kj::none); + }; + auto tracer = maybeMakeTracer(); + return kj::heap(globalContext->threadContext, kj::mv(worker), kj::mv(errorReporter.defaultEntrypoint), kj::mv(errorReporter.namedEntrypoints), - localActorConfigs, kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors)); + localActorConfigs, kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors), + kj::mv(tracer), loggingServices.releaseAsArray(), + [this](const config::ServiceDesignator::Reader& service, kj::StringPtr context) -> Service& { + return lookupService(service, kj::str(context)); + }); } // ======================================================================================= diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index 4a2e5794735..d26cd10e29b 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -630,6 +630,11 @@ struct Worker { moduleFallback @13 :Text; + logging :union { + none @14 :Void; + toService @15 :ServiceDesignator; + toServices @16 :List(ServiceDesignator); + } } struct ExternalServer {