Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(llmobs): recreate eval metric writer on fork #10710

Merged
merged 9 commits into from
Sep 23, 2024
4 changes: 3 additions & 1 deletion ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,17 @@ def __init__(self, tracer=None):

def _child_after_fork(self):
self._llmobs_span_writer = self._llmobs_span_writer.recreate()
self._llmobs_eval_metric_writer = self._llmobs_eval_metric_writer.recreate()
self._trace_processor._span_writer = self._llmobs_span_writer
tracer_filters = self.tracer._filters
if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters):
tracer_filters += [self._trace_processor]
self.tracer.configure(settings={"FILTERS": tracer_filters})
try:
self._llmobs_span_writer.start()
self._llmobs_eval_metric_writer.start()
except ServiceStatusError:
log.debug("Error starting LLMObs span writer after fork")
log.debug("Error starting LLMObs writers after fork")

def _start_service(self) -> None:
tracer_filters = self.tracer._filters
Expand Down
8 changes: 8 additions & 0 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ def _url(self) -> str:
def _data(self, events: List[Any]) -> Dict[str, Any]:
raise NotImplementedError

def recreate(self) -> "BaseLLMObsWriter":
return self.__class__(
site=self._site,
api_key=self._api_key,
interval=self._interval,
timeout=self._timeout,
)


class LLMObsEvalMetricWriter(BaseLLMObsWriter):
"""Writer to the Datadog LLMObs Custom Eval Metrics Endpoint."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
LLM Observability: This fix resolves an issue where LLM Observability evaluation metrics were not being submitted
in forked processes. The evaluation metric writer thread now automatically restarts when a forked
process is detected.
70 changes: 56 additions & 14 deletions tests/llmobs/test_llmobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,20 +1422,7 @@ def test_activate_distributed_headers_activates_context(LLMObs, mock_logs):
mock_activate.assert_called_once_with(dummy_context)


def _task(llmobs_service, errors, original_pid, original_span_writer_id):
lievan marked this conversation as resolved.
Show resolved Hide resolved
"""Task in test_llmobs_fork which asserts that LLMObs in a forked process correctly recreates the writer."""
try:
with llmobs_service.workflow():
with llmobs_service.task():
assert llmobs_service._instance.tracer._pid != original_pid
assert id(llmobs_service._instance._llmobs_span_writer) != original_span_writer_id
assert llmobs_service._instance._llmobs_span_writer.enqueue.call_count == 2
assert llmobs_service._instance._llmobs_span_writer._encoder.encode.call_count == 2
except AssertionError as e:
errors.put(e)


def test_llmobs_fork_recreates_and_restarts_writer():
def test_llmobs_fork_recreates_and_restarts_span_writer():
"""Test that forking a process correctly recreates and restarts the LLMObsSpanWriter."""
with mock.patch("ddtrace.internal.writer.HTTPWriter._send_payload"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app")
Expand Down Expand Up @@ -1465,6 +1452,30 @@ def test_llmobs_fork_recreates_and_restarts_writer():
llmobs_service.disable()


def test_llmobs_fork_recreates_and_restarts_eval_metric_writer():
"""Test that forking a process correctly recreates and restarts the LLMObsSpanWriter."""
with mock.patch("ddtrace.llmobs._writer.BaseLLMObsWriter.periodic"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app")
original_pid = llmobs_service._instance.tracer._pid
original_eval_metric_writer = llmobs_service._instance._llmobs_eval_metric_writer
pid = os.fork()
if pid: # parent
assert llmobs_service._instance.tracer._pid == original_pid
assert llmobs_service._instance._llmobs_eval_metric_writer == original_eval_metric_writer
assert llmobs_service._instance._llmobs_eval_metric_writer.status == ServiceStatus.RUNNING
else: # child
assert llmobs_service._instance.tracer._pid != original_pid
assert llmobs_service._instance._llmobs_eval_metric_writer != original_eval_metric_writer
assert llmobs_service._instance._llmobs_eval_metric_writer.status == ServiceStatus.RUNNING
llmobs_service.disable()
os._exit(12)

_, status = os.waitpid(pid, 0)
exit_code = os.WEXITSTATUS(status)
assert exit_code == 12
llmobs_service.disable()


def test_llmobs_fork_create_span(monkeypatch):
"""Test that forking a process correctly encodes new spans created in each process."""
monkeypatch.setenv("_DD_LLMOBS_WRITER_INTERVAL", 5.0)
Expand All @@ -1489,6 +1500,37 @@ def test_llmobs_fork_create_span(monkeypatch):
llmobs_service.disable()


def test_llmobs_fork_submit_evaluation(monkeypatch):
"""Test that forking a process correctly encodes new spans created in each process."""
monkeypatch.setenv("_DD_LLMOBS_WRITER_INTERVAL", 5.0)
with mock.patch("ddtrace.llmobs._writer.BaseLLMObsWriter.periodic"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app", api_key="test_api_key")
pid = os.fork()
if pid: # parent
llmobs_service.submit_evaluation(
span_context={"span_id": "123", "trace_id": "456"},
label="toxicity",
metric_type="categorical",
value="high",
)
assert len(llmobs_service._instance._llmobs_eval_metric_writer._buffer) == 1
else: # child
llmobs_service.submit_evaluation(
span_context={"span_id": "123", "trace_id": "456"},
label="toxicity",
metric_type="categorical",
value="high",
)
assert len(llmobs_service._instance._llmobs_eval_metric_writer._buffer) == 1
llmobs_service.disable()
os._exit(12)

_, status = os.waitpid(pid, 0)
exit_code = os.WEXITSTATUS(status)
assert exit_code == 12
llmobs_service.disable()


def test_llmobs_fork_custom_filter(monkeypatch):
"""Test that forking a process correctly keeps any custom filters."""

Expand Down
Loading