Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(profiling): enable endpoint profiling using libdatadog exporter #10649

Merged
merged 48 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
704ed0b
this should be enabled by default
taegyunkim Sep 12, 2024
23fd30a
fix(profiling): fix endpoint profiling for stack v2
taegyunkim Sep 12, 2024
1706a75
format
taegyunkim Sep 12, 2024
a5ec7f7
Discard changes to tests/profiling/collector/test_stack.py
taegyunkim Sep 12, 2024
c960dd6
call ffi apis
taegyunkim Sep 13, 2024
ed2b6a0
fix test
taegyunkim Sep 13, 2024
37f1c83
more guards
taegyunkim Sep 13, 2024
3956326
remove unused export key
taegyunkim Sep 13, 2024
8b008e1
does this work?
taegyunkim Sep 13, 2024
d19bbce
format
taegyunkim Sep 13, 2024
530e062
use uint64_t
taegyunkim Sep 13, 2024
22f5483
add tags
taegyunkim Sep 13, 2024
fe30b26
propagate 'trace endpoint'
taegyunkim Sep 13, 2024
2505fc9
Revert "propagate 'trace endpoint'"
taegyunkim Sep 13, 2024
8d84b3a
try to mimick what py exporter does
taegyunkim Sep 17, 2024
438a2e6
debug print
taegyunkim Sep 17, 2024
652b1c9
update code
taegyunkim Sep 17, 2024
ac7040d
print span ids
taegyunkim Sep 17, 2024
e9cddf4
use set
taegyunkim Sep 18, 2024
59dd7d5
debug prints
taegyunkim Sep 18, 2024
da1544a
more debug prints
taegyunkim Sep 18, 2024
7263c38
thread name print
taegyunkim Sep 18, 2024
a4f24b6
also print thread name
taegyunkim Sep 18, 2024
8dc9595
maybe this is needed to propagate information for fork-based frameworks?
taegyunkim Sep 18, 2024
940bcdc
remove checks
taegyunkim Sep 18, 2024
3c6940f
typo
taegyunkim Sep 18, 2024
8dfc734
Revert "maybe this is needed to propagate information for fork-based …
taegyunkim Sep 18, 2024
f1b731d
simplify
taegyunkim Sep 19, 2024
ff24841
rename functions
taegyunkim Sep 19, 2024
ad3539e
change type
taegyunkim Sep 19, 2024
b2898d5
remove debug prints from ddup.pyx
taegyunkim Sep 19, 2024
78aa829
remove debug prints from stack.pyx and _threading.pyx
taegyunkim Sep 19, 2024
5331168
simplify push_span function signature
taegyunkim Sep 19, 2024
2578d60
simplify ddup.upload()
taegyunkim Sep 19, 2024
229b15f
minor edits
taegyunkim Sep 19, 2024
1ebf83d
remove newline
taegyunkim Sep 19, 2024
7e40a94
fix test
taegyunkim Sep 19, 2024
12d3201
revert change
taegyunkim Sep 19, 2024
ab1b66f
remove unused import
taegyunkim Sep 19, 2024
a6bf435
comments
taegyunkim Sep 19, 2024
6eb49ec
simplify code in ddup_interface
taegyunkim Sep 19, 2024
d7f4f69
reorder functions
taegyunkim Sep 19, 2024
df61879
test for non web span
taegyunkim Sep 19, 2024
a87712e
no need to check for enabled, as it will return empty
taegyunkim Sep 19, 2024
24c431f
minimal testing for trace endpoint
taegyunkim Sep 19, 2024
750bcbe
relnotes
taegyunkim Sep 19, 2024
b89de93
avoid frequent lock acquire and release
taegyunkim Sep 20, 2024
80eabe1
Merge branch 'main' into taegyunkim/endpoint-test
taegyunkim Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <map>
#include <stddef.h>
#include <stdint.h>
#include <string_view>
Expand Down Expand Up @@ -31,6 +32,8 @@ extern "C"
bool ddup_is_initialized();
void ddup_start();
void ddup_set_runtime_id(std::string_view runtime_id);
void ddup_profile_set_endpoints(std::map<int64_t, std::string_view> span_ids_to_endpoints);
void ddup_profile_add_endpoint_counts(std::map<std::string_view, int64_t> trace_endpoints_to_counts);
bool ddup_upload();

// Proxy functions to the underlying sample
Expand All @@ -48,10 +51,9 @@ extern "C"
std::string_view thread_name);
void ddup_push_task_id(Datadog::Sample* sample, int64_t task_id);
void ddup_push_task_name(Datadog::Sample* sample, std::string_view task_name);
void ddup_push_span_id(Datadog::Sample* sample, int64_t span_id);
void ddup_push_local_root_span_id(Datadog::Sample* sample, int64_t local_root_span_id);
void ddup_push_span_id(Datadog::Sample* sample, uint64_t span_id);
void ddup_push_local_root_span_id(Datadog::Sample* sample, uint64_t local_root_span_id);
void ddup_push_trace_type(Datadog::Sample* sample, std::string_view trace_type);
void ddup_push_trace_resource_container(Datadog::Sample* sample, std::string_view trace_resource_container);
void ddup_push_exceptioninfo(Datadog::Sample* sample, std::string_view exception_type, int64_t count);
void ddup_push_class_name(Datadog::Sample* sample, std::string_view class_name);
void ddup_push_frame(Datadog::Sample* sample,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ namespace Datadog {
X(span_id, "span id") \
X(local_root_span_id, "local root span id") \
X(trace_type, "trace type") \
X(trace_resource_container, "trace resource container") \
X(trace_endpoint, "trace endpoint") \
X(class_name, "class name") \
X(lock_name, "lock name")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class Sample
bool push_span_id(uint64_t span_id);
bool push_local_root_span_id(uint64_t local_root_span_id);
bool push_trace_type(std::string_view trace_type);
bool push_trace_resource_container(std::string_view trace_resource_container);
bool push_exceptioninfo(std::string_view exception_type, int64_t count);
bool push_class_name(std::string_view class_name);
bool push_monotonic_ns(int64_t monotonic_ns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ ddup_push_task_name(Datadog::Sample* sample, std::string_view task_name) // cppc
}

void
ddup_push_span_id(Datadog::Sample* sample, int64_t span_id) // cppcheck-suppress unusedFunction
ddup_push_span_id(Datadog::Sample* sample, uint64_t span_id) // cppcheck-suppress unusedFunction
{
sample->push_span_id(span_id);
}

void
ddup_push_local_root_span_id(Datadog::Sample* sample, int64_t local_root_span_id) // cppcheck-suppress unusedFunction
ddup_push_local_root_span_id(Datadog::Sample* sample, uint64_t local_root_span_id) // cppcheck-suppress unusedFunction
{
sample->push_local_root_span_id(local_root_span_id);
}
Expand All @@ -235,13 +235,6 @@ ddup_push_trace_type(Datadog::Sample* sample, std::string_view trace_type) // cp
sample->push_trace_type(trace_type);
}

void
ddup_push_trace_resource_container(Datadog::Sample* sample, // cppcheck-suppress unusedFunction
std::string_view trace_resource_container)
{
sample->push_trace_resource_container(trace_resource_container);
}

void
ddup_push_exceptioninfo(Datadog::Sample* sample, // cppcheck-suppress unusedFunction
std::string_view exception_type,
Expand Down Expand Up @@ -314,3 +307,38 @@ ddup_upload() // cppcheck-suppress unusedFunction
}
return success;
}

void
ddup_profile_set_endpoints(
std::map<int64_t, std::string_view> span_ids_to_endpoints) // cppcheck-suppress unusedFunction
{
ddog_prof_Profile& profile = Datadog::Sample::profile_borrow();
for (const auto& [span_id, trace_endpoint] : span_ids_to_endpoints) {
ddog_CharSlice trace_endpoint_slice = Datadog::to_slice(trace_endpoint);
auto res = ddog_prof_Profile_set_endpoint(&profile, span_id, trace_endpoint_slice);
if (!res.ok) {
auto err = res.err;
const std::string errmsg = Datadog::err_to_msg(&err, "Error setting endpoint");
std::cerr << errmsg << std::endl;
ddog_Error_drop(&err);
}
}
Datadog::Sample::profile_release();
}

void
ddup_profile_add_endpoint_counts(std::map<std::string_view, int64_t> trace_endpoints_to_counts)
{
ddog_prof_Profile& profile = Datadog::Sample::profile_borrow();
for (const auto& [trace_endpoint, count] : trace_endpoints_to_counts) {
ddog_CharSlice trace_endpoint_slice = Datadog::to_slice(trace_endpoint);
auto res = ddog_prof_Profile_add_endpoint_count(&profile, trace_endpoint_slice, count);
if (!res.ok) {
auto err = res.err;
const std::string errmsg = Datadog::err_to_msg(&err, "Error adding endpoint count");
std::cerr << errmsg << std::endl;
ddog_Error_drop(&err);
}
}
Datadog::Sample::profile_release();
}
10 changes: 0 additions & 10 deletions ddtrace/internal/datadog/profiling/dd_wrapper/src/sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,6 @@ Datadog::Sample::push_trace_type(std::string_view trace_type)
return true;
}

bool
Datadog::Sample::push_trace_resource_container(std::string_view trace_resource_container)
{
if (!push_label(ExportLabelKey::trace_resource_container, trace_resource_container)) {
std::cout << "bad push" << std::endl;
return false;
}
return true;
}

bool
Datadog::Sample::push_class_name(std::string_view class_name)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Datadog::Uploader::upload(ddog_prof_Profile& profile)
ddog_prof_Exporter_Slice_File_empty(),
{ .ptr = &file, .len = 1 },
nullptr,
nullptr,
encoded->endpoints_stats,
nullptr,
nullptr);
ddog_prof_EncodedProfile_drop(encoded);
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/internal/datadog/profiling/ddup/_ddup.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ class SampleHandle:
def push_task_name(self, task_name: StringType) -> None: ...
def push_exceptioninfo(self, exc_type: Union[None, bytes, str, type], count: int) -> None: ...
def push_class_name(self, class_name: StringType) -> None: ...
def push_span(self, span: Optional[Span], endpoint_collection_enabled: bool) -> None: ...
def push_span(self, span: Optional[Span]) -> None: ...
def push_monotonic_ns(self, monotonic_ns: int) -> None: ...
def flush_sample(self) -> None: ...
43 changes: 33 additions & 10 deletions ddtrace/internal/datadog/profiling/ddup/_ddup.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ from typing import Dict
from typing import Optional
from typing import Union

from libcpp.map cimport map
from libcpp.utility cimport pair

import ddtrace
import platform
from .._types import StringType
Expand Down Expand Up @@ -46,6 +49,10 @@ cdef extern from "ddup_interface.hpp":
void ddup_config_sample_type(unsigned int type)

void ddup_start()
void ddup_set_runtime_id(string_view _id)
void ddup_profile_set_endpoints(map[int64_t, string_view] span_ids_to_endpoints)
void ddup_profile_add_endpoint_counts(map[string_view, int64_t] trace_endpoints_to_counts)
bint ddup_upload() nogil

Sample *ddup_start_sample()
void ddup_push_walltime(Sample *sample, int64_t walltime, int64_t count)
Expand All @@ -61,15 +68,12 @@ cdef extern from "ddup_interface.hpp":
void ddup_push_span_id(Sample *sample, uint64_t span_id)
void ddup_push_local_root_span_id(Sample *sample, uint64_t local_root_span_id)
void ddup_push_trace_type(Sample *sample, string_view trace_type)
void ddup_push_trace_resource_container(Sample *sample, string_view trace_resource_container)
void ddup_push_exceptioninfo(Sample *sample, string_view exception_type, int64_t count)
void ddup_push_class_name(Sample *sample, string_view class_name)
void ddup_push_frame(Sample *sample, string_view _name, string_view _filename, uint64_t address, int64_t line)
void ddup_push_monotonic_ns(Sample *sample, int64_t monotonic_ns)
void ddup_flush_sample(Sample *sample)
void ddup_drop_sample(Sample *sample)
void ddup_set_runtime_id(string_view _id)
bint ddup_upload() nogil

# Create wrappers for cython
cdef call_ddup_config_service(bytes service):
Expand Down Expand Up @@ -171,6 +175,31 @@ def start() -> None:
def upload() -> None:
runtime_id = ensure_binary_or_empty(get_runtime_id())
ddup_set_runtime_id(string_view(<const char*>runtime_id, len(runtime_id)))

processor = ddtrace.tracer._endpoint_call_counter_span_processor
endpoint_counts, endpoint_to_span_ids = processor.reset()

cdef map[int64_t, string_view] span_ids_to_endpoints = map[int64_t, string_view]()
for endpoint, span_ids in endpoint_to_span_ids.items():
endpoint_bytes = ensure_binary_or_empty(endpoint)
for span_id in span_ids:
span_ids_to_endpoints.insert(
pair[int64_t, string_view](
clamp_to_uint64_unsigned(span_id),
string_view(<const char*>endpoint_bytes, len(endpoint_bytes))
)
)
ddup_profile_set_endpoints(span_ids_to_endpoints)

cdef map[string_view, int64_t] trace_endpoints_to_counts = map[string_view, int64_t]()
for endpoint, cnt in endpoint_counts.items():
endpoint_bytes = ensure_binary_or_empty(endpoint)
trace_endpoints_to_counts.insert(pair[string_view, int64_t](
string_view(<const char*>endpoint_bytes, len(endpoint_bytes)),
clamp_to_int64_unsigned(cnt)
))
ddup_profile_add_endpoint_counts(trace_endpoints_to_counts)

with nogil:
ddup_upload()

Expand Down Expand Up @@ -269,7 +298,7 @@ cdef class SampleHandle:
class_name_bytes = ensure_binary_or_empty(class_name)
ddup_push_class_name(self.ptr, string_view(<const char*>class_name_bytes, len(class_name_bytes)))

def push_span(self, span: Optional[Span], endpoint_collection_enabled: bool) -> None:
def push_span(self, span: Optional[Span]) -> None:
if self.ptr is NULL:
return
if not span:
Expand All @@ -283,12 +312,6 @@ cdef class SampleHandle:
if span._local_root.span_type:
span_type_bytes = ensure_binary_or_empty(span._local_root.span_type)
ddup_push_trace_type(self.ptr, string_view(<const char*>span_type_bytes, len(span_type_bytes)))
if endpoint_collection_enabled:
root_resource_bytes = ensure_binary_or_empty(span._local_root.resource)
ddup_push_trace_resource_container(
self.ptr,
string_view(<const char*>root_resource_bytes, len(root_resource_bytes))
)

def push_monotonic_ns(self, monotonic_ns: int) -> None:
if self.ptr is not NULL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def test():
h.push_task_id(value)
h.push_task_name(name)
h.push_exceptioninfo(exc_type, value)
h.push_span(span, endpoint)
h.push_span(span)
h.push_frame(name, name, value, lineno)
h.flush_sample()
except Exception as e:
Expand Down
17 changes: 14 additions & 3 deletions ddtrace/internal/processor/endpoint_call_counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
@dataclass(eq=False)
class EndpointCallCounterProcessor(SpanProcessor):
endpoint_counts: EndpointCountsType = field(default_factory=dict, init=False, repr=False, compare=False)
# Mapping from endpoint to list of span IDs, here we use mapping from
# endpoint to span_ids instead of mapping from span_id to endpoint to
# avoid creating a new string object for each span id.
endpoint_to_span_ids: typing.Dict[str, typing.List[int]] = field(
default_factory=dict, init=False, repr=False, compare=False
)
_endpoint_counts_lock: typing.ContextManager = field(
default_factory=forksafe.Lock, init=False, repr=False, compare=False
)
Expand All @@ -34,12 +40,17 @@ def on_span_finish(self, span):
return
if span._local_root == span and span.span_type == SpanTypes.WEB:
resource = ensure_text(span.resource, errors="backslashreplace")
span_id = span.span_id
with self._endpoint_counts_lock:
self.endpoint_counts[resource] = self.endpoint_counts.get(resource, 0) + 1
if resource not in self.endpoint_to_span_ids:
self.endpoint_to_span_ids[resource] = []
self.endpoint_to_span_ids[resource].append(span_id)

def reset(self):
# type: () -> EndpointCountsType
def reset(self) -> typing.Tuple[EndpointCountsType, typing.Dict[str, typing.List[int]]]:
with self._endpoint_counts_lock:
counts = self.endpoint_counts
self.endpoint_counts = {}
return counts
span_ids = self.endpoint_to_span_ids
self.endpoint_to_span_ids = {}
return counts, span_ids
3 changes: 2 additions & 1 deletion ddtrace/profiling/collector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def periodic(self):
# type: (...) -> None
"""Collect events and push them into the recorder."""
for events in self.collect():
self.recorder.push_events(events)
if self.recorder:
self.recorder.push_events(events)

def collect(self):
# type: (...) -> typing.Iterable[typing.Iterable[event.Event]]
Expand Down
6 changes: 2 additions & 4 deletions ddtrace/profiling/collector/_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _acquire(self, inner_func, *args, **kwargs):
handle.push_task_name(task_name)

if self._self_tracer is not None:
handle.push_span(self._self_tracer.current_span(), self._self_endpoint_collection_enabled)
handle.push_span(self._self_tracer.current_span())
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.flush_sample()
Expand Down Expand Up @@ -214,9 +214,7 @@ def _release(self, inner_func, *args, **kwargs):
handle.push_task_name(task_name)

if self._self_tracer is not None:
handle.push_span(
self._self_tracer.current_span(), self._self_endpoint_collection_enabled
)
handle.push_span(self._self_tracer.current_span())
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.flush_sample()
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/profiling/collector/stack.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
handle.push_class_name(frames[0].class_name)
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.push_span(span, collect_endpoint)
handle.push_span(span)
handle.flush_sample()
else:
event = stack_event.StackSampleEvent(
Expand Down Expand Up @@ -399,7 +399,7 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
handle.push_class_name(frames[0].class_name)
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.push_span(span, collect_endpoint)
handle.push_span(span)
handle.flush_sample()
else:
exc_event = stack_event.StackExceptionSampleEvent(
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/profiling/exporter/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def export(
} # type: Dict[str, Any]

if self.endpoint_call_counter_span_processor is not None:
event["endpoint_counts"] = self.endpoint_call_counter_span_processor.reset()
event["endpoint_counts"] = self.endpoint_call_counter_span_processor.reset()[0]

content_type, body = self._encode_multipart_formdata(
event=json.dumps(event).encode("utf-8"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
Fixes endpoint profiling when using libdatadog exporter, either with
``DD_PROFILING_EXPORT_LIBDD_ENABLED`` or ``DD_PROFILING_TIMELINE_ENABLED``.

Loading
Loading