Skip to content

Commit

Permalink
[o11y] Add convenience functions to add span tags for subrequest
Browse files Browse the repository at this point in the history
- This allows adding initial tags for KV and R2. Also add lw:top_level_execution
  span.
  • Loading branch information
fhanau committed Oct 28, 2024
1 parent e664e3e commit 8b5822e
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/workerd/api/kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
KJ_UNREACHABLE;
}();

auto client = context.getHttpClient(subrequestChannel, true, kj::none, operationName);
auto client = context.getHttpClientWithSpans(
subrequestChannel, true, kj::none, operationName, {{"db.system"_kjc, "cloudflare-kv"_kjc}});
headers.add(FLPROD_405_HEADER, urlStr);
for (const auto& header: additionalHeaders) {
headers.add(header.name.asPtr(), header.value.asPtr());
Expand Down
11 changes: 8 additions & 3 deletions src/workerd/api/r2-admin.c++
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ jsg::Ref<R2Bucket> R2Admin::get(jsg::Lock& js, kj::String bucketName) {
jsg::Promise<jsg::Ref<R2Bucket>> R2Admin::create(
jsg::Lock& js, kj::String name, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
auto& context = IoContext::current();
auto client = context.getHttpClient(subrequestChannel, true, kj::none, "r2_create"_kjc);
// TODO(o11y): Add cloudflare.r2.bucket here.
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_create"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "CreateBucket"_kjc}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -57,7 +59,8 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType,
CompatibilityFlags::Reader flags) {
auto& context = IoContext::current();
auto client = context.getHttpClient(subrequestChannel, true, kj::none, "r2_list"_kjc);
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_list"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "ListObjects"_kjc}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -113,7 +116,9 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
jsg::Promise<void> R2Admin::delete_(
jsg::Lock& js, kj::String name, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
auto& context = IoContext::current();
auto client = context.getHttpClient(subrequestChannel, true, kj::none, "r2_delete"_kjc);
// TODO(o11y): Add cloudflare.r2.bucket
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_delete"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "DeleteBucket"_kjc}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
29 changes: 29 additions & 0 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,26 @@ kj::Own<WorkerInterface> IoContext::getSubrequestChannel(
});
}

kj::Own<WorkerInterface> IoContext::getSubrequestChannelSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
std::initializer_list<SpanTagParams> tags) {
return getSubrequest(
[&](TraceContext& tracing, IoChannelFactory& channelFactory) {
for (const SpanTagParams& tag: tags) {
tracing.userSpan.setTag(kj::mv(tag.key), kj::str(tag.value));
}
return getSubrequestChannelImpl(
channel, isInHouse, kj::mv(cfBlobJson), tracing, channelFactory);
},
SubrequestOptions{
.inHouse = isInHouse,
.wrapMetrics = !isInHouse,
.operationName = kj::mv(operationName),
});
}

kj::Own<WorkerInterface> IoContext::getSubrequestChannelNoChecks(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
Expand Down Expand Up @@ -871,6 +891,15 @@ kj::Own<kj::HttpClient> IoContext::getHttpClient(
getSubrequestChannel(channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName)));
}

kj::Own<kj::HttpClient> IoContext::getHttpClientWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
std::initializer_list<SpanTagParams> tags) {
return asHttpClient(getSubrequestChannelSpans(
channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName), kj::mv(tags)));
}

kj::Own<kj::HttpClient> IoContext::getHttpClientNoChecks(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
Expand Down
23 changes: 23 additions & 0 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <kj/function.h>
#include <kj/mutex.h>

#include <initializer_list>

namespace workerd {
class LimitEnforcer;
}
Expand Down Expand Up @@ -696,6 +698,20 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName);

// As above, but with list of span tags to add.
// TODO(o11y): For now this only supports literal values based on initializer_list constraints.
// Add syntactic sugar to kj::vector so that we can pass in a vector more ergonomically and use
// that instead to support other value types.
struct SpanTagParams {
kj::LiteralStringConst key;
kj::LiteralStringConst value;
};
kj::Own<WorkerInterface> getSubrequestChannelSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
std::initializer_list<SpanTagParams> tags);

// Like getSubrequestChannel() but doesn't enforce limits. Use for trusted paths only.
kj::Own<WorkerInterface> getSubrequestChannelNoChecks(uint channel,
bool isInHouse,
Expand All @@ -709,6 +725,13 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName);

// As above, but with list of span tags to add, analogous to getSubrequestChannelSpans().
kj::Own<kj::HttpClient> getHttpClientWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
std::initializer_list<SpanTagParams> tags);

// Convenience methods that call getSubrequest*() and adapt the returned WorkerInterface objects
// to HttpClient.
kj::Own<kj::HttpClient> getHttpClientNoChecks(uint channel,
Expand Down
6 changes: 4 additions & 2 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1551,7 +1551,7 @@ Worker::Worker(kj::Own<const Script> scriptParam,
kj::FunctionParam<void(jsg::Lock& lock, const Api& api, v8::Local<v8::Object> target)>
compileBindings,
IsolateObserver::StartType startType,
SpanParent parentSpan,
TraceParentContext spans,
LockType lockType,
kj::Maybe<ValidationErrorReporter&> errorReporter,
kj::Maybe<kj::Duration&> startupTime)
Expand All @@ -1574,7 +1574,7 @@ Worker::Worker(kj::Own<const Script> scriptParam,
});

auto maybeMakeSpan = [&](auto operationName) -> SpanBuilder {
auto span = parentSpan.newChild(kj::mv(operationName));
auto span = spans.parentSpan.newChild(kj::mv(operationName));
if (span.isObserved()) {
span.setTag("truncated_script_id"_kjc, truncateScriptId(script->getId()));
}
Expand Down Expand Up @@ -1641,6 +1641,8 @@ Worker::Worker(kj::Own<const Script> scriptParam,

// Execute script.
currentSpan = maybeMakeSpan("lw:top_level_execution"_kjc);
SpanBuilder currentUserSpan =
spans.userParentSpan.newChild("lw:top_level_execution"_kjc);

KJ_SWITCH_ONEOF(script->impl->unboundScriptOrMainModule) {
KJ_CASE_ONEOF(unboundScript, jsg::NonModuleScript) {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class Worker: public kj::AtomicRefcounted {
kj::FunctionParam<void(jsg::Lock& lock, const Api& api, v8::Local<v8::Object> target)>
compileBindings,
IsolateObserver::StartType startType,
SpanParent parentSpan,
TraceParentContext spans,
LockType lockType,
kj::Maybe<ValidationErrorReporter&> errorReporter = kj::none,
kj::Maybe<kj::Duration&> startupTime = kj::none);
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2972,7 +2972,7 @@ kj::Own<Server::Service> Server::makeWorker(kj::StringPtr name,
[&](jsg::Lock& lock, const Worker::Api& api, v8::Local<v8::Object> target) {
return WorkerdApi::from(api).compileGlobals(lock, globals, target, 1);
}, IsolateObserver::StartType::COLD,
nullptr, // systemTracer -- TODO(beta): factor out
TraceParentContext(nullptr, nullptr), // systemTracer -- TODO(beta): factor out
Worker::Lock::TakeSynchronously(kj::none), errorReporter);

{
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ TestFixture::TestFixture(SetupParams&& params)
// no bindings, nothing to do
},
IsolateObserver::StartType::COLD,
nullptr /* parentSpan */,
TraceParentContext(nullptr, nullptr), /* spans */
Worker::LockType(Worker::Lock::TakeSynchronously(kj::none)))),
errorHandler(kj::heap<DummyErrorHandler>()),
waitUntilTasks(*errorHandler),
Expand Down

0 comments on commit 8b5822e

Please sign in to comment.