diff --git a/ddtrace/llmobs/_evaluators/ragas/faithfulness.py b/ddtrace/llmobs/_evaluators/ragas/faithfulness.py new file mode 100644 index 0000000000..58c1009651 --- /dev/null +++ b/ddtrace/llmobs/_evaluators/ragas/faithfulness.py @@ -0,0 +1,232 @@ +import json +import math +import time +import typing +from typing import Optional + +from langchain_core.pydantic_v1 import ValidationError +import numpy as np +from ragas.llms import llm_factory +from ragas.llms.output_parser import RagasoutputParser +from ragas.llms.output_parser import get_json_format_instructions +from ragas.metrics import faithfulness +from ragas.metrics.base import ensembler +from ragas.metrics.base import get_segmenter + +from ddtrace import config +from ddtrace.internal.logger import get_logger + +from .utils import FaithfulnessInputs +from .utils import StatementFaithfulnessAnswers +from .utils import StatementsAnswers +from .utils import context_parser +from .utils import extract_inputs_from_messages_prompt + + +logger = get_logger(__name__) + +# populate default values for faithfulness class +faithfulness.llm = llm_factory() + +statement_prompt = faithfulness.statement_prompt + +statements_output_instructions = get_json_format_instructions(StatementsAnswers) +statements_output_parser = RagasoutputParser(pydantic_object=StatementsAnswers) + +faithfulness_output_instructions = get_json_format_instructions(StatementFaithfulnessAnswers) +faithfulness_output_parser = RagasoutputParser(pydantic_object=StatementFaithfulnessAnswers) + +sentence_segmenter = get_segmenter(language=faithfulness.nli_statements_message.language, clean=False) + + +def create_statements_prompt(answer, question, llmobs_service): + with llmobs_service.task("ragas.create_statements_prompt") as task: + task.service = "ragas" + sentences = sentence_segmenter.segment(answer) + sentences = [sentence for sentence in sentences if sentence.strip().endswith(".")] + sentences = "\n".join([f"{i}:{x}" for i, x in enumerate(sentences)]) + return statement_prompt.format(question=question, answer=answer, sentences=sentences) + + +def create_nli_prompt(statements, context_str, llmobs_service): + with llmobs_service.task("ragas.create_nli_prompt") as task: + task.service = "ragas" + statements_str: str = json.dumps(statements) + prompt_value = faithfulness.nli_statements_message.format(context=context_str, statements=statements_str) + return prompt_value + + +def compute_score(answers, llmobs_service): + with llmobs_service.task("ragas.compute_score") as task: + task.service = "ragas" + faithful_statements = sum(1 if answer.verdict else 0 for answer in answers.__root__) + num_statements = len(answers.__root__) + if num_statements: + score = faithful_statements / num_statements + else: + score = np.nan + llmobs_service.annotate( + metadata={ + "faithful_statements": faithful_statements, + "num_statements": num_statements, + }, + output_data=score, + ) + return score + + +def extract_question_and_context_using_llm(messages, llmobs_service): + with llmobs_service.workflow("ragas.extract_question_and_context_using_llm"): + llmobs_service.annotate(input_data=messages) + extracted_inputs = faithfulness.llm.generate_text( + prompt=extract_inputs_from_messages_prompt.format(messages=messages) + ) + statements = context_parser.parse(extracted_inputs.generations[0][0].text) + llmobs_service.annotate( + input_data=messages, output_data={"question": statements.question, "context": statements.context} + ) + llmobs_service.annotate(output_data={"question": statements.question, "context": statements.context}) + return statements.question, statements.context + + +def extract_faithfulness_inputs(span: dict, llmobs_service) -> typing.Optional[FaithfulnessInputs]: + with llmobs_service.workflow("ragas.extract_faithfulness_inputs"): + llmobs_service.annotate(input_data=span) + question, answer, context_str = None, None, None + + meta_io = span.get("meta") + if meta_io is None: + return None + + meta_input = meta_io.get("input") + meta_output = meta_io.get("output") + + if meta_input or meta_output is None: + return None + + messages = meta_output.get("messages") + if messages is not None and len(messages) > 0: + answer = messages[-1].get("content") + + prompt = meta_input.get("prompt") + question = None + context = None + if prompt is not None and prompt.get("variables") is not None: + variables = prompt.get("variables") + question = variables.get("question") + context = variables.get("context") + + if question is None or context is None: + question, context_str = extract_question_and_context_using_llm(span, llmobs_service) + try: + llmobs_service.annotate(output_data={"question": question, "context": context, "answer": answer}) + return FaithfulnessInputs(question=question, context=context_str, answer=answer) + except ValidationError as e: + logger.debug("Failed to validate faithfulness inputs", e) + return None + + +def score_faithfulness(span, llmobs_service): + llmobs_metadata = {} + token_usage = {"input_tokens": 0, "output_tokens": 0} + score = np.nan + with llmobs_service.workflow("ragas.faithfulness") as workflow: + try: + workflow.service = "ragas" + + faithfulness_inputs = extract_faithfulness_inputs(span, llmobs_service) + if faithfulness_inputs is None: + return np.nan, None, llmobs_service.export_span() + + question, answer, context_str = ( + faithfulness_inputs.question, + faithfulness_inputs.answer, + faithfulness_inputs.context, + ) + + statements_prompt = create_statements_prompt(question, answer, llmobs_service=llmobs_service) + + statements = faithfulness.llm.generate_text(statements_prompt) + + usage = statements.llm_output.get("token_usage") + if usage: + token_usage["input_tokens"] += usage.get("prompt_tokens") if usage.get("prompt_tokens") else 0 + token_usage["output_tokens"] += usage.get("completion_tokens") if usage.get("completion_tokens") else 0 + + statements = statements_output_parser.parse(statements.generations[0][0].text) + + if statements is None: + return np.nan + statements = [item["simpler_statements"] for item in statements.dicts()] + statements = [item for sublist in statements for item in sublist] + + llmobs_metadata["statements"] = statements + + assert isinstance(statements, typing.List), "statements must be a list" + + p_value = create_nli_prompt(statements, context_str, llmobs_service=llmobs_service) + + nli_result = faithfulness.llm.generate_text(p_value) + + usage = nli_result.llm_output.get("token_usage") + if usage: + token_usage["input_tokens"] += usage.get("prompt_tokens") if usage.get("completion_tokens") else 0 + token_usage["output_tokens"] += usage.get("prompt_tokens") if usage.get("completion_tokens") else 0 + + nli_result_text = [nli_result.generations[0][i].text for i in range(faithfulness._reproducibility)] + faithfulness_list = [faithfulness_output_parser.parse(text) for text in nli_result_text] + + faithfulness_list = [faith.dicts() for faith in faithfulness_list if faith is not None] + + llmobs_metadata["faithfulness_list"] = faithfulness_list + + if faithfulness_list: + faithfulness_list = ensembler.from_discrete( + faithfulness_list, + "verdict", + ) + + faithfulness_list = StatementFaithfulnessAnswers.parse_obj(faithfulness_list) + else: + return np.nan, None, llmobs_service.export_span() + score = compute_score(faithfulness_list, llmobs_service=llmobs_service) + return score, faithfulness_list.json(), llmobs_service.export_span() + finally: + llmobs_metadata.update(token_usage) + llmobs_service.annotate( + input_data={ + "answer": answer, + "question": question, + "context_str": context_str, + }, + output_data=score, + metadata=llmobs_metadata, + ) + + +class RagasFaithfulnessEvaluator: + label = "ragas_faithfulness" + metric_type = "score" + llmobs_service = None + + def __init__(self, llmobs_service): + RagasFaithfulnessEvaluator.llmobs_service = llmobs_service + + @classmethod + def evaluate(cls, span) -> Optional[dict]: + if cls.llmobs_service is None: + return None + + score, faithfulness_list, exported_span = score_faithfulness(span, cls.llmobs_service) + if math.isnan(score): + return None + return { + "span_id": exported_span.get("span_id"), + "trace_id": exported_span.get("trace_id"), + "score_value": 1, + "ml_app": config._llmobs_ml_app, + "timestamp_ms": math.floor(time.time() * 1000), + "metric_type": cls.metric_type, + "label": cls.label, + "metadata": {"ragas.faithfulness_list": faithfulness_list}, + } diff --git a/ddtrace/llmobs/_evaluators/ragas/utils.py b/ddtrace/llmobs/_evaluators/ragas/utils.py new file mode 100644 index 0000000000..11fad1d5b8 --- /dev/null +++ b/ddtrace/llmobs/_evaluators/ragas/utils.py @@ -0,0 +1,96 @@ +import typing as t + +from langchain_core.pydantic_v1 import BaseModel +from langchain_core.pydantic_v1 import Field +from ragas.llms.output_parser import RagasoutputParser +from ragas.llms.prompt import Prompt + + +class FaithfulnessInputs(BaseModel): + question: str = Field(..., description="the question to be answered") + context: str = Field(..., description="the context to be used to answer the question") + answer: str = Field(..., description="the answer to the question") + + +class StatementFaithfulnessAnswer(BaseModel): + statement: str = Field(..., description="the original statement, word-by-word") + reason: str = Field(..., description="the reason of the verdict") + verdict: int = Field(..., description="the verdict(0/1) of the faithfulness.") + + +class StatementFaithfulnessAnswers(BaseModel): + __root__: t.List[StatementFaithfulnessAnswer] + + def dicts(self) -> t.List[t.Dict]: + return self.dict()["__root__"] + + +class Statements(BaseModel): + sentence_index: int = Field(..., description="Index of the sentence from the statement list") + simpler_statements: t.List[str] = Field(..., description="the simpler statements") + + +class StatementsAnswers(BaseModel): + __root__: t.List[Statements] + + def dicts(self) -> t.List[t.Dict]: + return self.dict()["__root__"] + + +class ExtractedContext(BaseModel): + context: str = Field(..., description="the extracted context") + question: str = Field(..., description="the extracted question") + + +context_parser = RagasoutputParser(pydantic_object=ExtractedContext) + + +extract_inputs_from_messages_prompt = Prompt( + name="extract_context", + instruction="""You will be given a prompt to a large language model. + The prompt will contain a question and the reference information + that should be used to reference that question. + Your task is to extract out the reference information. + Do not include any text that is not in the original input.""", + examples=[ + { + "messages": [ + { + "role": "user", + "content": """ +Given the following question and reference context, answer the user's question +question: What are the effects of carbonated water on teeth? +context_str: Carbonated water has negative, destructive effects on teeth, and result in +decreasing microhardness and removal of the adhesive material on etched or sealed enamel. +Erosion occurred when the etched enamel of teeth was exposed to carbonated water, +particularly in groups exposed to high-level carbonated water. +Alleviation of this destructive effect is observed in groups exposed to carbonated water with calcium ion. +Partial removal of the adhesive material on sealed enamel could be observed after exposure to carbonated water. + """, + }, + ], + "output": { + "context": """ +Carbonated water has negative, destructive effects on teeth, and result in +decreasing microhardness and removal of the adhesive material on etched or sealed enamel. +Erosion occurred when the etched enamel of teeth was exposed to carbonated water, +particularly in groups exposed to high-level carbonated water. +Alleviation of this destructive effect is observed in groups exposed to carbonated water with calcium ion. +Partial removal of the adhesive material on sealed enamel could be observed after exposure to carbonated water. + """, + "question": "What are the effects of carbonated water on teeth?", + }, + }, + ], + input_keys=["messages"], + output_key="output", + output_format_instruction=""" + The output should be a json with the question and the context extracted from the messages. + For example: + { + "context": "Extracted context", + "question": "Extracted question" + } + """, + output_type="json", +) diff --git a/ddtrace/llmobs/_evaluators/runner.py b/ddtrace/llmobs/_evaluators/runner.py new file mode 100644 index 0000000000..a47bc3bd03 --- /dev/null +++ b/ddtrace/llmobs/_evaluators/runner.py @@ -0,0 +1,84 @@ +import atexit +from concurrent import futures +import os +from typing import Dict + +from ddtrace.internal import forksafe +from ddtrace.internal.logger import get_logger +from ddtrace.internal.periodic import PeriodicService + +from .ragas.faithfulness import RagasFaithfulnessEvaluator + + +logger = get_logger(__name__) + +SUPPORTED_EVALUATORS = { + "ragas_faithfulness": RagasFaithfulnessEvaluator, +} + + +class EvaluatorRunner(PeriodicService): + """Base class for evaluating LLM Observability span events""" + + def __init__(self, interval: float, _evaluation_metric_writer=None, _llmobs_service=None): + super(EvaluatorRunner, self).__init__(interval=interval) + self._lock = forksafe.RLock() + self._buffer = [] # type: list[Dict] + self._buffer_limit = 1000 + + self._evaluation_metric_writer = _evaluation_metric_writer + + self.executor = futures.ThreadPoolExecutor() + self.evaluators = [] + + evaluator_str = os.getenv("_DD_LLMOBS_EVALUATORS") + if evaluator_str is not None: + evaluators = evaluator_str.split(",") + for evaluator in evaluators: + if evaluator in SUPPORTED_EVALUATORS: + self.evaluators.append(SUPPORTED_EVALUATORS[evaluator](llmobs_service=_llmobs_service)) + + def start(self, *args, **kwargs): + super(EvaluatorRunner, self).start() + logger.debug("started %r to %r", self.__class__.__name__) + atexit.register(self.on_shutdown) + + def on_shutdown(self): + self.executor.shutdown() + + def enqueue(self, span_event: Dict) -> None: + with self._lock: + if len(self._buffer) >= self._buffer_limit: + logger.warning( + "%r event buffer full (limit is %d), dropping event", self.__class__.__name__, self._buffer_limit + ) + return + self._buffer.append(span_event) + + def periodic(self) -> None: + with self._lock: + if not self._buffer: + return + events = self._buffer + self._buffer = [] + + try: + evaluation_metrics = self.run(events) + for metric in evaluation_metrics: + if metric is not None: + self._evaluation_metric_writer.enqueue(metric) + except RuntimeError as e: + logger.debug("failed to run evaluation: %s", e) + + def run(self, spans): + batches_of_results = [] + + for evaluator in self.evaluators: + batches_of_results.append(self.executor.map(evaluator.evaluate, spans)) + + results = [] + for batch in batches_of_results: + for result in batch: + results.append(result) + + return results diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index c8d07cec93..3b6efd8f49 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -40,6 +40,7 @@ from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING from ddtrace.llmobs._constants import TAGS +from ddtrace.llmobs._evaluators.runner import EvaluatorRunner from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor from ddtrace.llmobs._utils import AnnotationContext from ddtrace.llmobs._utils import _get_llmobs_parent_id @@ -88,7 +89,13 @@ def __init__(self, tracer=None): interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)), timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)), ) - self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer) + + self._evaluator_runner = EvaluatorRunner( + interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 1.0)), + _evaluation_metric_writer=self._llmobs_eval_metric_writer, + ) + + self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluator_runner) forksafe.register(self._child_after_fork) def _child_after_fork(self): @@ -114,6 +121,11 @@ def _start_service(self) -> None: except ServiceStatusError: log.debug("Error starting LLMObs writers") + try: + self._evaluator_runner.start() + except ServiceStatusError: + log.debug("Error starting evaluator runner") + def _stop_service(self) -> None: try: self._llmobs_span_writer.stop() @@ -121,6 +133,11 @@ def _stop_service(self) -> None: except ServiceStatusError: log.debug("Error stopping LLMObs writers") + try: + self._evaluator_runner.stop() + except ServiceStatusError: + log.debug("Error stopping evaluator runner") + try: forksafe.unregister(self._child_after_fork) self.tracer.shutdown() @@ -156,7 +173,6 @@ def enable( if os.getenv("DD_LLMOBS_ENABLED") and not asbool(os.getenv("DD_LLMOBS_ENABLED")): log.debug("LLMObs.enable() called when DD_LLMOBS_ENABLED is set to false or 0, not starting LLMObs service") - return # grab required values for LLMObs config._dd_site = site or config._dd_site diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index 5a654a8fb9..e959704238 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -44,8 +44,9 @@ class LLMObsTraceProcessor(TraceProcessor): Processor that extracts LLM-type spans in a trace to submit as separate LLMObs span events to LLM Observability. """ - def __init__(self, llmobs_span_writer): + def __init__(self, llmobs_span_writer, evaluator_runner=None): self._span_writer = llmobs_span_writer + self._evaluator_runner = evaluator_runner def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: if not trace: @@ -57,11 +58,17 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: def submit_llmobs_span(self, span: Span) -> None: """Generate and submit an LLMObs span event to be sent to LLMObs.""" + span_event = None try: span_event = self._llmobs_span_event(span) self._span_writer.enqueue(span_event) except (KeyError, TypeError): log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span) + finally: + if not span_event: + return + if self._evaluator_runner: + self._evaluator_runner.enqueue(span_event) def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: """Span event object structure.""" @@ -113,6 +120,7 @@ def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: "status": "error" if span.error else "ok", "meta": meta, "metrics": metrics, + "ml_app": ml_app, } session_id = _get_session_id(span) if session_id is not None: diff --git a/tests/llmobs/_utils.py b/tests/llmobs/_utils.py index 308e420ddf..f0013ce431 100644 --- a/tests/llmobs/_utils.py +++ b/tests/llmobs/_utils.py @@ -200,6 +200,10 @@ def _llmobs_base_span_event( span_event["meta"]["error.type"] = error span_event["meta"]["error.message"] = error_message span_event["meta"]["error.stack"] = error_stack + if tags: + span_event["ml_app"] = tags.get("ml_app", "unnamed-ml-app") + else: + span_event["ml_app"] = "unnamed-ml-app" return span_event diff --git a/tests/llmobs/conftest.py b/tests/llmobs/conftest.py index 66c16ae180..48d34215e4 100644 --- a/tests/llmobs/conftest.py +++ b/tests/llmobs/conftest.py @@ -58,6 +58,16 @@ def mock_llmobs_eval_metric_writer(): patcher.stop() +@pytest.fixture +def mock_llmobs_ragas_evaluator(): + patcher = mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.run") + RagasEvaluator = patcher.start() + m = mock.MagicMock() + RagasEvaluator.return_value = m + yield m + patcher.stop() + + @pytest.fixture def mock_http_writer_send_payload_response(): with mock.patch( @@ -88,6 +98,12 @@ def mock_writer_logs(): yield m +@pytest.fixture +def mock_evaluator_logs(): + with mock.patch("ddtrace.llmobs._evaluators.runner.logger") as m: + yield m + + @pytest.fixture def mock_http_writer_logs(): with mock.patch("ddtrace.internal.writer.writer.log") as m: @@ -125,3 +141,16 @@ def AgentlessLLMObs(mock_llmobs_span_agentless_writer, mock_llmobs_eval_metric_w llmobs_service.enable(_tracer=dummy_tracer) yield llmobs_service llmobs_service.disable() + + +@pytest.fixture +def LLMObsWithRagas(monkeypatch, mock_llmobs_span_writer, mock_llmobs_eval_metric_writer, ddtrace_global_config): + global_config = default_global_config() + global_config.update(ddtrace_global_config) + monkeypatch.setenv("_DD_LLMOBS_EVALUATORS", "ragas_faithfulness") + monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_INTERVAL", "0.1") + with override_global_config(global_config): + dummy_tracer = DummyTracer() + llmobs_service.enable(_tracer=dummy_tracer) + yield llmobs_service + llmobs_service.disable() diff --git a/tests/llmobs/test_llmobs_evaluator_runner.py b/tests/llmobs/test_llmobs_evaluator_runner.py new file mode 100644 index 0000000000..603a2cb150 --- /dev/null +++ b/tests/llmobs/test_llmobs_evaluator_runner.py @@ -0,0 +1,126 @@ +import os +import time + +import mock +import pytest + +from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator +from ddtrace.llmobs._evaluators.runner import EvaluatorRunner +from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent + + +INTAKE_ENDPOINT = "https://api.datad0g.com/api/intake/llm-obs/v1/eval-metric" +DD_SITE = "datad0g.com" +dd_api_key = os.getenv("DD_API_KEY", default="") + + +def _categorical_metric_event(): + return { + "span_id": "12345678901", + "trace_id": "98765432101", + "metric_type": "categorical", + "categorical_value": "very", + "label": "toxicity", + "ml_app": "dummy-ml-app", + "timestamp_ms": round(time.time() * 1000), + } + + +def _score_metric_event(): + return { + "span_id": "12345678902", + "trace_id": "98765432102", + "metric_type": "score", + "label": "sentiment", + "score_value": 0.9, + "ml_app": "dummy-ml-app", + "timestamp_ms": round(time.time() * 1000), + } + + +def _dummy_ragas_eval_metric_event(span_id, trace_id): + return LLMObsEvaluationMetricEvent( + span_id=span_id, + trace_id=trace_id, + score_value=1, + ml_app="unnamed-ml-app", + timestamp_ms=mock.ANY, + metric_type="score", + label="ragas_faithfulness", + ) + + +def test_evaluator_runner_start(mock_evaluator_logs): + evaluator_runner = EvaluatorRunner(interval=0.01, _evaluation_metric_writer=mock.MagicMock()) + evaluator_runner.start() + mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "EvaluatorRunner")]) + + +def test_evaluator_runner_buffer_limit(mock_evaluator_logs): + evaluator_runner = EvaluatorRunner(interval=0.01, _evaluation_metric_writer=mock.MagicMock()) + for _ in range(1001): + evaluator_runner.enqueue({}) + mock_evaluator_logs.warning.assert_called_with( + "%r event buffer full (limit is %d), dropping event", "EvaluatorRunner", 1000 + ) + + +def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): + evaluator_runner = EvaluatorRunner(interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer) + evaluator_runner.evaluators.append(RagasFaithfulnessEvaluator) + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) + evaluator_runner.periodic() + mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( + _dummy_ragas_eval_metric_event(span_id="123", trace_id="1234") + ) + + +@pytest.mark.vcr_logs +def test_ragas_faithfulness_evaluator_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): + evaluator_runner = EvaluatorRunner(interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer) + evaluator_runner.evaluators.append(RagasFaithfulnessEvaluator) + evaluator_runner.start() + + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) + + time.sleep(0.1) + + mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( + _dummy_ragas_eval_metric_event(span_id="123", trace_id="1234") + ) + + +def test_evaluator_runner_on_exit(mock_writer_logs, run_python_code_in_subprocess): + out, err, status, pid = run_python_code_in_subprocess( + """ +import os +import time +import mock + +from ddtrace.internal.utils.http import Response +from ddtrace.llmobs._writer import LLMObsEvalMetricWriter +from ddtrace.llmobs._evaluators.runner import EvaluatorRunner +from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator + +with mock.patch( + "ddtrace.internal.writer.HTTPWriter._send_payload", + return_value=Response( + status=200, + body="{}", + ), +): + llmobs_eval_metric_writer = LLMObsEvalMetricWriter( + site="datad0g.com", api_key=os.getenv("DD_API_KEY_STAGING"), interval=0.01, timeout=1 + ) + llmobs_eval_metric_writer.start() + evaluator_runner = EvaluatorRunner( + interval=0.01, _evaluation_metric_writer=llmobs_eval_metric_writer + ) + evaluator_runner.evaluators.append(RagasFaithfulnessEvaluator) + evaluator_runner.start() + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) +""", + ) + assert status == 0, err + assert out == b"" + assert err == b"" diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index b49bc4298c..30884747ed 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -1,5 +1,6 @@ import json import os +import time import mock import pytest @@ -89,6 +90,7 @@ def test_service_disable(): assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" + assert llmobs_service._instance._evaluator_runner.status.value == "stopped" def test_service_enable_no_api_key(): @@ -99,6 +101,7 @@ def test_service_enable_no_api_key(): assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" + assert llmobs_service._instance._evaluator_runner.status.value == "stopped" def test_service_enable_no_ml_app_specified(): @@ -109,6 +112,7 @@ def test_service_enable_no_ml_app_specified(): assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" + assert llmobs_service._instance._evaluator_runner.status.value == "stopped" def test_service_enable_deprecated_ml_app_name(monkeypatch, mock_logs): @@ -119,6 +123,7 @@ def test_service_enable_deprecated_ml_app_name(monkeypatch, mock_logs): assert llmobs_service.enabled is True assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "running" assert llmobs_service._instance._llmobs_span_writer.status.value == "running" + assert llmobs_service._instance._evaluator_runner.status.value == "running" mock_logs.warning.assert_called_once_with("`DD_LLMOBS_APP_NAME` is deprecated. Use `DD_LLMOBS_ML_APP` instead.") llmobs_service.disable() @@ -1525,6 +1530,13 @@ def process_trace(self, trace): llmobs_service.disable() +def test_llm_span_ragas_evaluator(LLMObsWithRagas): + with LLMObsWithRagas.llm(model_name="test_model"): + pass + time.sleep(0.1) + LLMObsWithRagas._instance._llmobs_eval_metric_writer.enqueue.call_count = 1 + + def test_annotation_context_modifies_span_tags(LLMObs): with LLMObs.annotation_context(tags={"foo": "bar"}): with LLMObs.agent(name="test_agent") as span: