Skip to content

Commit

Permalink
fix(llmobs): recreate eval metric writer on fork (#10710)
Browse files Browse the repository at this point in the history
This PR ensures that the `LLMObsEvalMetricWriter` is correctly recreated
and restarted on a forked process.

Previously, on a process fork we were not recreating/restarting the eval
metric writer worker.

Mirrors #10249 but for eval
metric writer

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: lievan <evan.li@datadoqhq.com>
Co-authored-by: Yun Kim <35776586+Yun-Kim@users.noreply.github.com>
(cherry picked from commit 5dbd7ef)
  • Loading branch information
lievan authored and github-actions[bot] committed Sep 23, 2024
1 parent efb3c0e commit a6a86ee
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 15 deletions.
4 changes: 3 additions & 1 deletion ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,17 @@ def __init__(self, tracer=None):

def _child_after_fork(self):
self._llmobs_span_writer = self._llmobs_span_writer.recreate()
self._llmobs_eval_metric_writer = self._llmobs_eval_metric_writer.recreate()
self._trace_processor._span_writer = self._llmobs_span_writer
tracer_filters = self.tracer._filters
if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters):
tracer_filters += [self._trace_processor]
self.tracer.configure(settings={"FILTERS": tracer_filters})
try:
self._llmobs_span_writer.start()
self._llmobs_eval_metric_writer.start()
except ServiceStatusError:
log.debug("Error starting LLMObs span writer after fork")
log.debug("Error starting LLMObs writers after fork")

def _start_service(self) -> None:
tracer_filters = self.tracer._filters
Expand Down
8 changes: 8 additions & 0 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ def _url(self) -> str:
def _data(self, events: List[Any]) -> Dict[str, Any]:
raise NotImplementedError

def recreate(self) -> "BaseLLMObsWriter":
return self.__class__(
site=self._site,
api_key=self._api_key,
interval=self._interval,
timeout=self._timeout,
)


class LLMObsEvalMetricWriter(BaseLLMObsWriter):
"""Writer to the Datadog LLMObs Custom Eval Metrics Endpoint."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
LLM Observability: This fix resolves an issue where LLM Observability evaluation metrics were not being submitted
in forked processes. The evaluation metric writer thread now automatically restarts when a forked
process is detected.
70 changes: 56 additions & 14 deletions tests/llmobs/test_llmobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,20 +1362,7 @@ def test_activate_distributed_headers_activates_context(LLMObs, mock_logs):
mock_activate.assert_called_once_with(dummy_context)


def _task(llmobs_service, errors, original_pid, original_span_writer_id):
"""Task in test_llmobs_fork which asserts that LLMObs in a forked process correctly recreates the writer."""
try:
with llmobs_service.workflow():
with llmobs_service.task():
assert llmobs_service._instance.tracer._pid != original_pid
assert id(llmobs_service._instance._llmobs_span_writer) != original_span_writer_id
assert llmobs_service._instance._llmobs_span_writer.enqueue.call_count == 2
assert llmobs_service._instance._llmobs_span_writer._encoder.encode.call_count == 2
except AssertionError as e:
errors.put(e)


def test_llmobs_fork_recreates_and_restarts_writer():
def test_llmobs_fork_recreates_and_restarts_span_writer():
"""Test that forking a process correctly recreates and restarts the LLMObsSpanWriter."""
with mock.patch("ddtrace.internal.writer.HTTPWriter._send_payload"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app")
Expand Down Expand Up @@ -1405,6 +1392,30 @@ def test_llmobs_fork_recreates_and_restarts_writer():
llmobs_service.disable()


def test_llmobs_fork_recreates_and_restarts_eval_metric_writer():
"""Test that forking a process correctly recreates and restarts the LLMObsSpanWriter."""
with mock.patch("ddtrace.llmobs._writer.BaseLLMObsWriter.periodic"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app")
original_pid = llmobs_service._instance.tracer._pid
original_eval_metric_writer = llmobs_service._instance._llmobs_eval_metric_writer
pid = os.fork()
if pid: # parent
assert llmobs_service._instance.tracer._pid == original_pid
assert llmobs_service._instance._llmobs_eval_metric_writer == original_eval_metric_writer
assert llmobs_service._instance._llmobs_eval_metric_writer.status == ServiceStatus.RUNNING
else: # child
assert llmobs_service._instance.tracer._pid != original_pid
assert llmobs_service._instance._llmobs_eval_metric_writer != original_eval_metric_writer
assert llmobs_service._instance._llmobs_eval_metric_writer.status == ServiceStatus.RUNNING
llmobs_service.disable()
os._exit(12)

_, status = os.waitpid(pid, 0)
exit_code = os.WEXITSTATUS(status)
assert exit_code == 12
llmobs_service.disable()


def test_llmobs_fork_create_span(monkeypatch):
"""Test that forking a process correctly encodes new spans created in each process."""
monkeypatch.setenv("_DD_LLMOBS_WRITER_INTERVAL", 5.0)
Expand All @@ -1429,6 +1440,37 @@ def test_llmobs_fork_create_span(monkeypatch):
llmobs_service.disable()


def test_llmobs_fork_submit_evaluation(monkeypatch):
"""Test that forking a process correctly encodes new spans created in each process."""
monkeypatch.setenv("_DD_LLMOBS_WRITER_INTERVAL", 5.0)
with mock.patch("ddtrace.llmobs._writer.BaseLLMObsWriter.periodic"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app", api_key="test_api_key")
pid = os.fork()
if pid: # parent
llmobs_service.submit_evaluation(
span_context={"span_id": "123", "trace_id": "456"},
label="toxicity",
metric_type="categorical",
value="high",
)
assert len(llmobs_service._instance._llmobs_eval_metric_writer._buffer) == 1
else: # child
llmobs_service.submit_evaluation(
span_context={"span_id": "123", "trace_id": "456"},
label="toxicity",
metric_type="categorical",
value="high",
)
assert len(llmobs_service._instance._llmobs_eval_metric_writer._buffer) == 1
llmobs_service.disable()
os._exit(12)

_, status = os.waitpid(pid, 0)
exit_code = os.WEXITSTATUS(status)
assert exit_code == 12
llmobs_service.disable()


def test_llmobs_fork_custom_filter(monkeypatch):
"""Test that forking a process correctly keeps any custom filters."""

Expand Down

0 comments on commit a6a86ee

Please sign in to comment.