Skip to content

Commit

Permalink
Initial tail worker support in workerd
Browse files Browse the repository at this point in the history
Adds the ability for a workerd worker configuration to specify
a tail worker configuration. This is useful for testing tail
worker development locally, which up to now has not been possible.
  • Loading branch information
jasnell committed Oct 7, 2024
1 parent ebdcce9 commit c01f16d
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 8 deletions.
25 changes: 25 additions & 0 deletions samples/tail-worker/config.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Workerd = import "/workerd/workerd.capnp";

const tailWorkerExample :Workerd.Config = (
services = [
(name = "main", worker = .helloWorld),
(name = "log", worker = .logWorker),
],

sockets = [ ( name = "http", address = "*:8080", http = (), service = "main" ) ]
);

const helloWorld :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "worker.js")
],
compatibilityDate = "2023-02-28",
logging = ( toService = "log" ),
);

const logWorker :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "tail.js")
],
compatibilityDate = "2023-02-28",
);
9 changes: 9 additions & 0 deletions samples/tail-worker/tail.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) 2017-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export default {
tail(traces) {
console.log(traces);
}
};
9 changes: 9 additions & 0 deletions samples/tail-worker/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) 2017-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export default {
async fetch(req, env) {
return new Response("Hello World\n");
}
};
2 changes: 2 additions & 0 deletions src/workerd/api/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,8 @@ class TraceCustomEventImpl final: public WorkerInterface::CustomEvent {
return typeId;
}

static constexpr uint16_t TYPE = 2;

private:
uint16_t typeId;
kj::Array<kj::Own<workerd::Trace>> traces;
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/trace-streaming.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class StreamingTrace final {
// to be used directly. Use the static create(...) method instead.
explicit StreamingTrace(kj::Own<const IdFactory::Id> id,
trace::Onset&& onset,
Delegate delegatem,
Delegate delegate,
const TimeProvider& timeProvider);
~StreamingTrace() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(StreamingTrace);
Expand Down
210 changes: 203 additions & 7 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <workerd/api/actor-state.h>
#include <workerd/api/analytics-engine.capnp.h>
#include <workerd/api/pyodide/pyodide.h>
#include <workerd/api/trace.h>
#include <workerd/api/worker-rpc.h>
#include <workerd/io/actor-cache.h>
#include <workerd/io/actor-id.h>
Expand Down Expand Up @@ -1441,6 +1442,141 @@ void Server::InspectorServiceIsolateRegistrar::registerIsolate(

// =======================================================================================

namespace {
class RequestObserverWithTracer final: public RequestObserver, public WorkerInterface {
public:
RequestObserverWithTracer(kj::Maybe<kj::Own<WorkerTracer>> tracer): tracer(kj::mv(tracer)) {}
~RequestObserverWithTracer() noexcept(false) {
KJ_IF_SOME(t, tracer) {
if (fetchStatus != 0) {
t->setFetchResponseInfo(trace::FetchResponseInfo(fetchStatus));
}
t->setOutcomeInfo(trace::Outcome(outcome, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS));
}
}

WorkerInterface& wrapWorkerInterface(WorkerInterface& worker) override {
if (tracer != kj::none) {
inner = worker;
return *this;
}
return worker;
}

void reportFailure(const kj::Exception& exception, FailureSource source) override {
outcome = EventOutcome::EXCEPTION;
}

class ResponseObserver final: public kj::HttpService::Response {
public:
ResponseObserver(kj::HttpService::Response& response): inner(response) {}

kj::Own<kj::AsyncOutputStream> send(uint statusCode,
kj::StringPtr statusText,
const kj::HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize) override {
this->statusCode = statusCode;
return inner.send(statusCode, statusText, headers, expectedBodySize);
}

kj::Own<kj::WebSocket> acceptWebSocket(const kj::HttpHeaders& headers) override {
return inner.acceptWebSocket(headers);
}

uint getStatusCode() const {
return statusCode;
}

private:
kj::HttpService::Response& inner;
uint statusCode;
};

// WorkerInterface
kj::Promise<void> request(kj::HttpMethod method,
kj::StringPtr url,
const kj::HttpHeaders& headers,
kj::AsyncInputStream& requestBody,
kj::HttpService::Response& response) override {
try {
ResponseObserver responseWrapper(response);
co_await KJ_ASSERT_NONNULL(inner).request(method, url, headers, requestBody, responseWrapper);
fetchStatus = responseWrapper.getStatusCode();
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
reportFailure(exception, FailureSource::OTHER);
fetchStatus = 500;
throw exception;
}
}

kj::Promise<void> connect(kj::StringPtr host,
const kj::HttpHeaders& headers,
kj::AsyncIoStream& connection,
ConnectResponse& response,
kj::HttpConnectSettings settings) override {
try {
co_return co_await KJ_ASSERT_NONNULL(inner).connect(
host, headers, connection, response, settings);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
reportFailure(exception, FailureSource::OTHER);
throw exception;
}
}

void prewarm(kj::StringPtr url) override {
try {
KJ_ASSERT_NONNULL(inner).prewarm(url);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
reportFailure(exception, FailureSource::OTHER);
throw exception;
}
}

kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
try {
co_return co_await KJ_ASSERT_NONNULL(inner).runScheduled(scheduledTime, cron);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
reportFailure(exception, FailureSource::OTHER);
throw exception;
}
}

kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override {
try {
co_return co_await KJ_ASSERT_NONNULL(inner).runAlarm(scheduledTime, retryCount);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
reportFailure(exception, FailureSource::OTHER);
throw exception;
}
}

kj::Promise<bool> test() override {
try {
co_return co_await KJ_ASSERT_NONNULL(inner).test();
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
reportFailure(exception, FailureSource::OTHER);
throw exception;
}
}

kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override {
return KJ_ASSERT_NONNULL(inner).customEvent(kj::mv(event));
}

private:
kj::Maybe<kj::Own<WorkerTracer>> tracer;
kj::Maybe<WorkerInterface&> inner;
EventOutcome outcome = EventOutcome::OK;
int fetchStatus = 0;
};
} // namespace

class Server::WorkerService final: public Service,
private kj::TaskSet::ErrorHandler,
private IoChannelFactory,
Expand All @@ -1459,20 +1595,28 @@ public:
};
using LinkCallback = kj::Function<LinkedIoChannels(WorkerService&)>;
using AbortActorsCallback = kj::Function<void()>;
using LookupServiceCallback =
kj::Function<Service&(config::ServiceDesignator::Reader, kj::StringPtr)>;

WorkerService(ThreadContext& threadContext,
kj::Own<const Worker> worker,
kj::Maybe<kj::HashSet<kj::String>> defaultEntrypointHandlers,
kj::HashMap<kj::String, kj::HashSet<kj::String>> namedEntrypointsParam,
const kj::HashMap<kj::String, ActorConfig>& actorClasses,
LinkCallback linkCallback,
AbortActorsCallback abortActorsCallback)
AbortActorsCallback abortActorsCallback,
kj::Maybe<kj::Own<PipelineTracer>> maybeTracer,
kj::Array<kj::Own<config::ServiceDesignator::Reader>> loggingServices,
LookupServiceCallback lookupService)
: threadContext(threadContext),
ioChannels(kj::mv(linkCallback)),
worker(kj::mv(worker)),
defaultEntrypointHandlers(kj::mv(defaultEntrypointHandlers)),
waitUntilTasks(*this),
abortActorsCallback(kj::mv(abortActorsCallback)) {
abortActorsCallback(kj::mv(abortActorsCallback)),
maybeTracer(kj::mv(maybeTracer)),
loggingServices(kj::mv(loggingServices)),
lookupService(kj::mv(lookupService)) {

namedEntrypoints.reserve(namedEntrypointsParam.size());
for (auto& ep: namedEntrypointsParam) {
Expand Down Expand Up @@ -1530,15 +1674,38 @@ public:
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<kj::Own<Worker::Actor>> actor = kj::none) {
TRACE_EVENT("workerd", "Server::WorkerService::startRequest()");
kj::Maybe<kj::Own<WorkerTracer>> maybeWorkerTracer = kj::none;
KJ_IF_SOME(tracer, maybeTracer) {
auto childTracer = kj::refcounted<PipelineTracer>(kj::addRef(*tracer));
maybeWorkerTracer = childTracer->makeWorkerTracer(PipelineLogLevel::FULL, kj::none, kj::none,
kj::none, kj::none, kj::none, nullptr, kj::none);

kj::Vector<kj::Own<WorkerInterface>> tailWorkers;
for (auto& svc: loggingServices) {
auto& service = lookupService(*svc, "looking logging service");
KJ_ASSERT(&service != this, "A worker currently cannot log to itself");
tailWorkers.add(service.startRequest({}));
}
waitUntilTasks.add(childTracer->onComplete().then(
[this, tailWorkers = kj::mv(tailWorkers)](
kj::Array<kj::Own<Trace>> traces) mutable -> kj::Promise<void> {
for (auto& worker: tailWorkers) {
auto event = kj::heap<workerd::api::TraceCustomEventImpl>(
workerd::api::TraceCustomEventImpl::TYPE, waitUntilTasks, mapAddRef(traces));
co_return co_await worker->customEvent(kj::mv(event)).ignoreResult();
}
co_return;
}));
};
return newWorkerEntrypoint(threadContext, kj::atomicAddRef(*worker), entrypointName,
kj::mv(actor), kj::Own<LimitEnforcer>(this, kj::NullDisposer::instance),
{}, // ioContextDependency
kj::Own<IoChannelFactory>(this, kj::NullDisposer::instance),
kj::refcounted<RequestObserver>(), // default observer makes no observations
kj::refcounted<RequestObserverWithTracer>(
mapAddRef(maybeWorkerTracer)), // default observer makes no observations
waitUntilTasks,
true, // tunnelExceptions
kj::none, // workerTracer
kj::mv(metadata.cfBlobJson));
true, // tunnelExceptions
kj::mv(maybeWorkerTracer), kj::mv(metadata.cfBlobJson));
}

class ActorNamespace final {
Expand Down Expand Up @@ -2060,6 +2227,9 @@ private:
kj::HashMap<kj::StringPtr, kj::Own<ActorNamespace>> actorNamespaces;
kj::TaskSet waitUntilTasks;
AbortActorsCallback abortActorsCallback;
kj::Maybe<kj::Own<PipelineTracer>> maybeTracer;
kj::Array<kj::Own<config::ServiceDesignator::Reader>> loggingServices;
LookupServiceCallback lookupService;

class ActorChannelImpl final: public IoChannelFactory::ActorChannel {
public:
Expand Down Expand Up @@ -3065,9 +3235,35 @@ kj::Own<Server::Service> Server::makeWorker(kj::StringPtr name,
return result;
};

kj::Vector<kj::Own<config::ServiceDesignator::Reader>> loggingServices;
auto maybeMakeTracer = [conf, &loggingServices]() -> kj::Maybe<kj::Own<PipelineTracer>> {
auto logging = conf.getLogging();
switch (logging.which()) {
case config::Worker::Logging::Which::NONE:
return kj::none;
case config::Worker::Logging::Which::TO_SERVICE: {
loggingServices.add(capnp::clone(logging.getToService()));
break;
}
case config::Worker::Logging::Which::TO_SERVICES: {
for (auto svc: logging.getToServices()) {
loggingServices.add(capnp::clone(svc));
}
break;
}
}

return kj::refcounted<PipelineTracer>(kj::none);
};
auto tracer = maybeMakeTracer();

return kj::heap<WorkerService>(globalContext->threadContext, kj::mv(worker),
kj::mv(errorReporter.defaultEntrypoint), kj::mv(errorReporter.namedEntrypoints),
localActorConfigs, kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors));
localActorConfigs, kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors),
kj::mv(tracer), loggingServices.releaseAsArray(),
[this](const config::ServiceDesignator::Reader& service, kj::StringPtr context) -> Service& {
return lookupService(service, kj::str(context));
});
}

// =======================================================================================
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/server/workerd.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,11 @@ struct Worker {

moduleFallback @13 :Text;

logging :union {
none @14 :Void;
toService @15 :ServiceDesignator;
toServices @16 :List(ServiceDesignator);
}
}

struct ExternalServer {
Expand Down

0 comments on commit c01f16d

Please sign in to comment.