Skip to content

Commit

Permalink
Provide ExecutionModel enum to tail worker
Browse files Browse the repository at this point in the history
  • Loading branch information
fhanau committed Oct 11, 2024
1 parent 603a2be commit 4482168
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 12 deletions.
16 changes: 11 additions & 5 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ jsg::Optional<kj::Array<kj::String>> getTraceScriptTags(const Trace& trace) {
}
}

kj::String getTraceOutcome(const Trace& trace) {
// TODO(cleanup): Add to enumToStr() to capnp?
auto enums = capnp::Schema::from<EventOutcome>().getEnumerants();
uint i = static_cast<uint>(trace.outcome);
template <typename Enum>
kj::String enumToStr(const Enum& var) {
// TODO(cleanup): Port this to capnproto.
auto enums = capnp::Schema::from<Enum>().getEnumerants();
uint i = static_cast<uint>(var);
KJ_ASSERT(i < enums.size(), "invalid outcome");
return kj::str(enums[i].getProto().getName());
}
Expand Down Expand Up @@ -203,7 +204,8 @@ TraceItem::TraceItem(jsg::Lock& js, const Trace& trace)
scriptVersion(getTraceScriptVersion(trace)),
dispatchNamespace(trace.dispatchNamespace.map([](auto& ns) { return kj::str(ns); })),
scriptTags(getTraceScriptTags(trace)),
outcome(getTraceOutcome(trace)),
executionModel(enumToStr(trace.executionModel)),
outcome(enumToStr(trace.outcome)),
cpuTime(trace.cpuTime / kj::MILLISECONDS),
wallTime(trace.wallTime / kj::MILLISECONDS),
truncated(trace.truncated) {}
Expand Down Expand Up @@ -280,6 +282,10 @@ jsg::Optional<kj::Array<kj::StringPtr>> TraceItem::getScriptTags() {
[](kj::Array<kj::String>& tags) { return KJ_MAP(t, tags) -> kj::StringPtr { return t; }; });
}

kj::StringPtr TraceItem::getExecutionModel() {
return executionModel;
}

kj::StringPtr TraceItem::getOutcome() {
return outcome;
}
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/api/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class TraceItem final: public jsg::Object {
jsg::Optional<ScriptVersion> getScriptVersion();
jsg::Optional<kj::StringPtr> getDispatchNamespace();
jsg::Optional<kj::Array<kj::StringPtr>> getScriptTags();
kj::StringPtr getExecutionModel();
kj::StringPtr getOutcome();

uint getCpuTime();
Expand All @@ -119,6 +120,7 @@ class TraceItem final: public jsg::Object {
JSG_LAZY_READONLY_INSTANCE_PROPERTY(dispatchNamespace, getDispatchNamespace);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(scriptTags, getScriptTags);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(outcome, getOutcome);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(executionModel, getExecutionModel);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(truncated, getTruncated);
}

Expand All @@ -135,6 +137,7 @@ class TraceItem final: public jsg::Object {
kj::Maybe<ScriptVersion> scriptVersion;
kj::Maybe<kj::String> dispatchNamespace;
jsg::Optional<kj::Array<kj::String>> scriptTags;
kj::String executionModel;
kj::String outcome;
uint cpuTime;
uint wallTime;
Expand Down
16 changes: 11 additions & 5 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,16 @@ Trace::Trace(kj::Maybe<kj::String> stableId,
kj::Maybe<kj::String> dispatchNamespace,
kj::Maybe<kj::String> scriptId,
kj::Array<kj::String> scriptTags,
kj::Maybe<kj::String> entrypoint)
kj::Maybe<kj::String> entrypoint,
ExecutionModel executionModel)
: stableId(kj::mv(stableId)),
scriptName(kj::mv(scriptName)),
scriptVersion(kj::mv(scriptVersion)),
dispatchNamespace(kj::mv(dispatchNamespace)),
scriptId(kj::mv(scriptId)),
scriptTags(kj::mv(scriptTags)),
entrypoint(kj::mv(entrypoint)) {}
entrypoint(kj::mv(entrypoint)),
executionModel(executionModel) {}
Trace::Trace(rpc::Trace::Reader reader) {
mergeFrom(reader, PipelineLogLevel::FULL);
}
Expand Down Expand Up @@ -311,6 +313,7 @@ void Trace::copyTo(rpc::Trace::Builder builder) {
KJ_IF_SOME(ns, dispatchNamespace) {
builder.setDispatchNamespace(ns);
}
builder.setExecutionModel(executionModel);

{
auto list = builder.initScriptTags(scriptTags.size());
Expand Down Expand Up @@ -430,6 +433,7 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev
if (reader.hasDispatchNamespace()) {
dispatchNamespace = kj::str(reader.getDispatchNamespace());
}
executionModel = reader.getExecutionModel();

if (auto tags = reader.getScriptTags(); tags.size() > 0) {
scriptTags = KJ_MAP(tag, tags) { return kj::str(tag); };
Expand Down Expand Up @@ -574,6 +578,7 @@ kj::Promise<kj::Array<kj::Own<Trace>>> PipelineTracer::onComplete() {
}

kj::Own<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
ExecutionModel executionModel,
kj::Maybe<kj::String> scriptId,
kj::Maybe<kj::String> stableId,
kj::Maybe<kj::String> scriptName,
Expand All @@ -582,7 +587,8 @@ kj::Own<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipeline
kj::Array<kj::String> scriptTags,
kj::Maybe<kj::String> entrypoint) {
auto trace = kj::refcounted<Trace>(kj::mv(stableId), kj::mv(scriptName), kj::mv(scriptVersion),
kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint));
kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint),
executionModel);
traces.add(kj::addRef(*trace));
return kj::refcounted<WorkerTracer>(kj::addRef(*this), kj::mv(trace), pipelineLogLevel);
}
Expand All @@ -597,10 +603,10 @@ WorkerTracer::WorkerTracer(
trace(kj::mv(trace)),
parentPipeline(kj::mv(parentPipeline)),
self(kj::refcounted<WeakRef<WorkerTracer>>(kj::Badge<WorkerTracer>{}, *this)) {}
WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel)
WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel executionModel)
: pipelineLogLevel(pipelineLogLevel),
trace(kj::refcounted<Trace>(
kj::none, kj::none, kj::none, kj::none, kj::none, nullptr, kj::none)),
kj::none, kj::none, kj::none, kj::none, kj::none, nullptr, kj::none, executionModel)),
self(kj::refcounted<WeakRef<WorkerTracer>>(kj::Badge<WorkerTracer>{}, *this)) {}

void WorkerTracer::log(kj::Date timestamp, LogLevel logLevel, kj::String message, bool isSpan) {
Expand Down
8 changes: 6 additions & 2 deletions src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ using kj::byte;
using kj::uint;

typedef rpc::Trace::Log::Level LogLevel;
typedef rpc::Trace::ExecutionModel ExecutionModel;

enum class PipelineLogLevel {
// WARNING: This must be kept in sync with PipelineDef::LogLevel (which is not in the OSS
Expand Down Expand Up @@ -61,7 +62,8 @@ class Trace final: public kj::Refcounted {
kj::Maybe<kj::String> dispatchNamespace,
kj::Maybe<kj::String> scriptId,
kj::Array<kj::String> scriptTags,
kj::Maybe<kj::String> entrypoint);
kj::Maybe<kj::String> entrypoint,
ExecutionModel executionModel);
Trace(rpc::Trace::Reader reader);
~Trace() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(Trace);
Expand Down Expand Up @@ -300,6 +302,7 @@ class Trace final: public kj::Refcounted {

kj::Vector<DiagnosticChannelEvent> diagnosticChannelEvents;

ExecutionModel executionModel;
EventOutcome outcome = EventOutcome::UNKNOWN;

kj::Maybe<FetchResponseInfo> fetchResponseInfo;
Expand Down Expand Up @@ -352,6 +355,7 @@ class PipelineTracer final: public kj::Refcounted {

// Makes a tracer for a worker stage.
kj::Own<WorkerTracer> makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
ExecutionModel executionModel,
kj::Maybe<kj::String> scriptId,
kj::Maybe<kj::String> stableId,
kj::Maybe<kj::String> scriptName,
Expand Down Expand Up @@ -382,7 +386,7 @@ class WorkerTracer final: public kj::Refcounted {
explicit WorkerTracer(kj::Own<PipelineTracer> parentPipeline,
kj::Own<Trace> trace,
PipelineLogLevel pipelineLogLevel);
explicit WorkerTracer(PipelineLogLevel pipelineLogLevel);
explicit WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel executionModel);
~WorkerTracer() {
self->invalidate();
}
Expand Down
8 changes: 8 additions & 0 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ struct Trace @0x8e8d911203762d34 {
message @2 :Data;
}

enum ExecutionModel {
stateless @0;
durableObject @1;
workflow @2;
}
executionModel @25 :ExecutionModel;
# the execution model of the worker being traced. Can be stateless for a regular worker,
# durableObject for a DO worker or workflow for the upcoming Workflows.
truncated @24 :Bool;
# Indicates that the trace was truncated due to reaching the maximum size limit.
}
Expand Down

0 comments on commit 4482168

Please sign in to comment.