Skip to content

Commit

Permalink
(Refactor) Langfuse - remove prepare_metadata, langfuse python SDK …
Browse files Browse the repository at this point in the history
…now handles non-json serializable objects (#7925)

* test_langfuse_logging_completion_with_langfuse_metadata

* fix litellm - remove prepare metadata

* test_langfuse_logging_with_non_serializable_metadata

* detailed e2e langfuse metadata tests

* clean up langfuse logging

* fix langfuse

* remove unused imports

* fix code qa checks

* fix _prepare_metadata
  • Loading branch information
ishaan-jaff authored Jan 23, 2025
1 parent 27560bd commit 53a3ea3
Show file tree
Hide file tree
Showing 16 changed files with 1,231 additions and 133 deletions.
110 changes: 24 additions & 86 deletions litellm/integrations/langfuse/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import copy
import os
import traceback
from collections.abc import MutableMapping, MutableSequence, MutableSet
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast

from packaging.version import Version
from pydantic import BaseModel

import litellm
from litellm._logging import verbose_logger
Expand Down Expand Up @@ -71,8 +69,9 @@ def __init__(
"flush_interval": self.langfuse_flush_interval, # flush interval in seconds
"httpx_client": self.langfuse_client,
}
self.langfuse_sdk_version: str = langfuse.version.__version__

if Version(langfuse.version.__version__) >= Version("2.6.0"):
if Version(self.langfuse_sdk_version) >= Version("2.6.0"):
parameters["sdk_integration"] = "litellm"

self.Langfuse = Langfuse(**parameters)
Expand Down Expand Up @@ -360,73 +359,6 @@ def _log_langfuse_v1(
)
)

def is_base_type(self, value: Any) -> bool:
# Check if the value is of a base type
base_types = (int, float, str, bool, list, dict, tuple)
return isinstance(value, base_types)

def _prepare_metadata(self, metadata: Optional[dict]) -> Any:
try:
if metadata is None:
return None

# Filter out function types from the metadata
sanitized_metadata = {k: v for k, v in metadata.items() if not callable(v)}

return copy.deepcopy(sanitized_metadata)
except Exception as e:
verbose_logger.debug(f"Langfuse Layer Error - {e}, metadata: {metadata}")

new_metadata: Dict[str, Any] = {}

# if metadata is not a MutableMapping, return an empty dict since we can't call items() on it
if not isinstance(metadata, MutableMapping):
verbose_logger.debug(
"Langfuse Layer Logging - metadata is not a MutableMapping, returning empty dict"
)
return new_metadata

for key, value in metadata.items():
try:
if isinstance(value, MutableMapping):
new_metadata[key] = self._prepare_metadata(cast(dict, value))
elif isinstance(value, MutableSequence):
# For lists or other mutable sequences
new_metadata[key] = list(
(
self._prepare_metadata(cast(dict, v))
if isinstance(v, MutableMapping)
else copy.deepcopy(v)
)
for v in value
)
elif isinstance(value, MutableSet):
# For sets specifically, create a new set by passing an iterable
new_metadata[key] = set(
(
self._prepare_metadata(cast(dict, v))
if isinstance(v, MutableMapping)
else copy.deepcopy(v)
)
for v in value
)
elif isinstance(value, BaseModel):
new_metadata[key] = value.model_dump()
elif self.is_base_type(value):
new_metadata[key] = value
else:
verbose_logger.debug(
f"Langfuse Layer Error - Unsupported metadata type: {type(value)} for key: {key}"
)
continue

except (TypeError, copy.Error):
verbose_logger.debug(
f"Langfuse Layer Error - Couldn't copy metadata key: {key}, type of key: {type(key)}, type of value: {type(value)} - {traceback.format_exc()}"
)

return new_metadata

def _log_langfuse_v2( # noqa: PLR0915
self,
user_id,
Expand All @@ -443,27 +375,17 @@ def _log_langfuse_v2( # noqa: PLR0915
print_verbose,
litellm_call_id,
) -> tuple:
import langfuse

verbose_logger.debug("Langfuse Layer Logging - logging to langfuse v2")

try:
metadata = self._prepare_metadata(metadata)

langfuse_version = Version(langfuse.version.__version__)

supports_tags = langfuse_version >= Version("2.6.3")
supports_prompt = langfuse_version >= Version("2.7.3")
supports_costs = langfuse_version >= Version("2.7.3")
supports_completion_start_time = langfuse_version >= Version("2.7.3")

metadata = metadata or {}
standard_logging_object: Optional[StandardLoggingPayload] = cast(
Optional[StandardLoggingPayload],
kwargs.get("standard_logging_object", None),
)
tags = (
self._get_langfuse_tags(standard_logging_object=standard_logging_object)
if supports_tags
if self._supports_tags()
else []
)

Expand Down Expand Up @@ -624,7 +546,7 @@ def _log_langfuse_v2( # noqa: PLR0915
if aws_region_name:
clean_metadata["aws_region_name"] = aws_region_name

if supports_tags:
if self._supports_tags():
if "cache_hit" in kwargs:
if kwargs["cache_hit"] is None:
kwargs["cache_hit"] = False
Expand Down Expand Up @@ -670,7 +592,7 @@ def _log_langfuse_v2( # noqa: PLR0915
usage = {
"prompt_tokens": _usage_obj.prompt_tokens,
"completion_tokens": _usage_obj.completion_tokens,
"total_cost": cost if supports_costs else None,
"total_cost": cost if self._supports_costs() else None,
}
generation_name = clean_metadata.pop("generation_name", None)
if generation_name is None:
Expand Down Expand Up @@ -713,7 +635,7 @@ def _log_langfuse_v2( # noqa: PLR0915
if parent_observation_id is not None:
generation_params["parent_observation_id"] = parent_observation_id

if supports_prompt:
if self._supports_prompt():
generation_params = _add_prompt_to_generation_params(
generation_params=generation_params,
clean_metadata=clean_metadata,
Expand All @@ -723,7 +645,7 @@ def _log_langfuse_v2( # noqa: PLR0915
if output is not None and isinstance(output, str) and level == "ERROR":
generation_params["status_message"] = output

if supports_completion_start_time:
if self._supports_completion_start_time():
generation_params["completion_start_time"] = kwargs.get(
"completion_start_time", None
)
Expand Down Expand Up @@ -770,6 +692,22 @@ def add_default_langfuse_tags(self, tags, kwargs, metadata):
tags.append(f"cache_key:{_cache_key}")
return tags

def _supports_tags(self):
"""Check if current langfuse version supports tags"""
return Version(self.langfuse_sdk_version) >= Version("2.6.3")

def _supports_prompt(self):
"""Check if current langfuse version supports prompt"""
return Version(self.langfuse_sdk_version) >= Version("2.7.3")

def _supports_costs(self):
"""Check if current langfuse version supports costs"""
return Version(self.langfuse_sdk_version) >= Version("2.7.3")

def _supports_completion_start_time(self):
"""Check if current langfuse version supports completion start time"""
return Version(self.langfuse_sdk_version) >= Version("2.7.3")


def _add_prompt_to_generation_params(
generation_params: dict,
Expand Down
3 changes: 3 additions & 0 deletions litellm/integrations/langfuse/langfuse_prompt_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def __init__(
langfuse_host=None,
flush_interval=1,
):
import langfuse

self.langfuse_sdk_version = langfuse.version.__version__
self.Langfuse = langfuse_client_init(
langfuse_public_key=langfuse_public_key,
langfuse_secret=langfuse_secret,
Expand Down
1 change: 0 additions & 1 deletion tests/code_coverage_tests/recursive_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"text_completion",
"_check_for_os_environ_vars",
"clean_message",
"_prepare_metadata",
"unpack_defs",
"convert_to_nullable",
"add_object_type",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"batch": [
{
"id": "9ee9100b-c4aa-4e40-a10d-bc189f8b4242",
"type": "trace-create",
"body": {
"id": "litellm-test-c414db10-dd68-406e-9d9e-03839bc2f346",
"timestamp": "2025-01-22T17:27:51.702596Z",
"name": "litellm-acompletion",
"input": {
"messages": [
{
"role": "user",
"content": "Hello!"
}
]
},
"output": {
"content": "Hello! How can I assist you today?",
"role": "assistant",
"tool_calls": null,
"function_call": null
},
"tags": []
},
"timestamp": "2025-01-22T17:27:51.702716Z"
},
{
"id": "f8d20489-ed58-429f-b609-87380e223746",
"type": "generation-create",
"body": {
"traceId": "litellm-test-c414db10-dd68-406e-9d9e-03839bc2f346",
"name": "litellm-acompletion",
"startTime": "2025-01-22T09:27:51.150898-08:00",
"metadata": {
"string_value": "hello",
"int_value": 42,
"float_value": 3.14,
"bool_value": true,
"nested_dict": {
"key1": "value1",
"key2": {
"inner_key": "inner_value"
}
},
"list_value": [
1,
2,
3
],
"set_value": [
1,
2,
3
],
"complex_list": [
{
"dict_in_list": "value"
},
"simple_string",
[
1,
2,
3
]
],
"user": {
"name": "John",
"age": 30,
"tags": [
"customer",
"active"
]
},
"hidden_params": {
"model_id": null,
"cache_key": null,
"api_base": "https://api.openai.com",
"response_cost": 5.4999999999999995e-05,
"additional_headers": {},
"litellm_overhead_time_ms": null
},
"litellm_response_cost": 5.4999999999999995e-05,
"cache_hit": false,
"requester_metadata": {}
},
"input": {
"messages": [
{
"role": "user",
"content": "Hello!"
}
]
},
"output": {
"content": "Hello! How can I assist you today?",
"role": "assistant",
"tool_calls": null,
"function_call": null
},
"level": "DEFAULT",
"id": "time-09-27-51-150898_chatcmpl-b783291c-dc76-4660-bfef-b79be9d54e57",
"endTime": "2025-01-22T09:27:51.702048-08:00",
"completionStartTime": "2025-01-22T09:27:51.702048-08:00",
"model": "gpt-3.5-turbo",
"modelParameters": {
"extra_body": "{}"
},
"usage": {
"input": 10,
"output": 20,
"unit": "TOKENS",
"totalCost": 5.4999999999999995e-05
}
},
"timestamp": "2025-01-22T17:27:51.703046Z"
}
],
"metadata": {
"batch_size": 2,
"sdk_integration": "litellm",
"sdk_name": "python",
"sdk_version": "2.44.1",
"public_key": "pk-lf-e02aaea3-8668-4c9f-8c69-771a4ea1f5c9"
}
}
Loading

0 comments on commit 53a3ea3

Please sign in to comment.