From 8a1b40aee9a8df28d015d235d21aad29fd406fa9 Mon Sep 17 00:00:00 2001 From: Matej Vetrak Date: Wed, 4 Sep 2024 15:02:20 +0200 Subject: [PATCH 01/25] Add support for async calls in Anthropic, OpenAI integrations --- sentry_sdk/integrations/anthropic.py | 204 +++++++++--------- sentry_sdk/integrations/openai.py | 304 +++++++++++++++------------ 2 files changed, 276 insertions(+), 232 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 41d8e9d7d5..5c4c24a51b 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -15,14 +15,13 @@ from typing import TYPE_CHECKING try: - from anthropic.resources import Messages + from anthropic.resources import Messages, AsyncMessages if TYPE_CHECKING: from anthropic.types import MessageStreamEvent except ImportError: raise DidNotEnable("Anthropic not installed") - if TYPE_CHECKING: from typing import Any, Iterator from sentry_sdk.tracing import Span @@ -48,6 +47,7 @@ def setup_once(): raise DidNotEnable("anthropic 0.16 or newer required.") Messages.create = _wrap_message_create(Messages.create) + AsyncMessages.create = _wrap_async_message_create(AsyncMessages.create) def _capture_exception(exc): @@ -75,105 +75,119 @@ def _calculate_token_usage(result, span): record_token_usage(span, input_tokens, output_tokens, total_tokens) +def _sentry_patched_create_common(f, *args, **kwargs): + # type: (Any, *Any, **Any) -> Any + if "messages" not in kwargs: + return f(*args, **kwargs) + + try: + iter(kwargs["messages"]) + except TypeError: + return f(*args, **kwargs) + + messages = list(kwargs["messages"]) + model = kwargs.get("model") + + span = sentry_sdk.start_span( + op=OP.ANTHROPIC_MESSAGES_CREATE, + description="Anthropic messages create", + origin=AnthropicIntegration.origin, + ) + span.__enter__() + + try: + result = f(*args, **kwargs) + except Exception as exc: + _capture_exception(exc) + span.__exit__(None, None, None) + raise exc from None + + integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) + + with capture_internal_exceptions(): + span.set_data(SPANDATA.AI_MODEL_ID, model) + span.set_data(SPANDATA.AI_STREAMING, False) + if should_send_default_pii() and integration.include_prompts: + span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages) + if hasattr(result, "content"): + if should_send_default_pii() and integration.include_prompts: + span.set_data( + SPANDATA.AI_RESPONSES, + list( + map( + lambda message: { + "type": message.type, + "text": message.text, + }, + result.content, + ) + ), + ) + _calculate_token_usage(result, span) + span.__exit__(None, None, None) + elif hasattr(result, "_iterator"): + old_iterator = result._iterator + + def new_iterator(): + # type: () -> Iterator[MessageStreamEvent] + input_tokens = 0 + output_tokens = 0 + content_blocks = [] + with capture_internal_exceptions(): + for event in old_iterator: + if hasattr(event, "type"): + if event.type == "message_start": + usage = event.message.usage + input_tokens += usage.input_tokens + output_tokens += usage.output_tokens + elif event.type == "content_block_start": + pass + elif event.type == "content_block_delta": + content_blocks.append(event.delta.text) + elif event.type == "content_block_stop": + pass + elif event.type == "message_delta": + output_tokens += event.usage.output_tokens + elif event.type == "message_stop": + continue + yield event + + if should_send_default_pii() and integration.include_prompts: + complete_message = "".join(content_blocks) + span.set_data( + SPANDATA.AI_RESPONSES, + [{"type": "text", "text": complete_message}], + ) + total_tokens = input_tokens + output_tokens + record_token_usage(span, input_tokens, output_tokens, total_tokens) + span.set_data(SPANDATA.AI_STREAMING, True) + span.__exit__(None, None, None) + + result._iterator = new_iterator() + else: + span.set_data("unknown_response", True) + span.__exit__(None, None, None) + + return result + + def _wrap_message_create(f): # type: (Any) -> Any @wraps(f) @ensure_integration_enabled(AnthropicIntegration, f) - def _sentry_patched_create(*args, **kwargs): + def _sentry_patched_create_sync(*args, **kwargs): # type: (*Any, **Any) -> Any - if "messages" not in kwargs: - return f(*args, **kwargs) - - try: - iter(kwargs["messages"]) - except TypeError: - return f(*args, **kwargs) - - messages = list(kwargs["messages"]) - model = kwargs.get("model") - - span = sentry_sdk.start_span( - op=OP.ANTHROPIC_MESSAGES_CREATE, - description="Anthropic messages create", - origin=AnthropicIntegration.origin, - ) - span.__enter__() - - try: - result = f(*args, **kwargs) - except Exception as exc: - _capture_exception(exc) - span.__exit__(None, None, None) - raise exc from None + return _sentry_patched_create_common(f, *args, **kwargs) - integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) - - with capture_internal_exceptions(): - span.set_data(SPANDATA.AI_MODEL_ID, model) - span.set_data(SPANDATA.AI_STREAMING, False) - if should_send_default_pii() and integration.include_prompts: - span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages) - if hasattr(result, "content"): - if should_send_default_pii() and integration.include_prompts: - span.set_data( - SPANDATA.AI_RESPONSES, - list( - map( - lambda message: { - "type": message.type, - "text": message.text, - }, - result.content, - ) - ), - ) - _calculate_token_usage(result, span) - span.__exit__(None, None, None) - elif hasattr(result, "_iterator"): - old_iterator = result._iterator - - def new_iterator(): - # type: () -> Iterator[MessageStreamEvent] - input_tokens = 0 - output_tokens = 0 - content_blocks = [] - with capture_internal_exceptions(): - for event in old_iterator: - if hasattr(event, "type"): - if event.type == "message_start": - usage = event.message.usage - input_tokens += usage.input_tokens - output_tokens += usage.output_tokens - elif event.type == "content_block_start": - pass - elif event.type == "content_block_delta": - content_blocks.append(event.delta.text) - elif event.type == "content_block_stop": - pass - elif event.type == "message_delta": - output_tokens += event.usage.output_tokens - elif event.type == "message_stop": - continue - yield event - - if should_send_default_pii() and integration.include_prompts: - complete_message = "".join(content_blocks) - span.set_data( - SPANDATA.AI_RESPONSES, - [{"type": "text", "text": complete_message}], - ) - total_tokens = input_tokens + output_tokens - record_token_usage( - span, input_tokens, output_tokens, total_tokens - ) - span.set_data(SPANDATA.AI_STREAMING, True) - span.__exit__(None, None, None) + return _sentry_patched_create_sync - result._iterator = new_iterator() - else: - span.set_data("unknown_response", True) - span.__exit__(None, None, None) - return result +def _wrap_async_message_create(f): + # type: (Any) -> Any + @wraps(f) + @ensure_integration_enabled(AnthropicIntegration, f) + async def _sentry_patched_create_async(*args, **kwargs): + # type: (*Any, **Any) -> Any + return await _sentry_patched_create_common(f, *args, **kwargs) - return _sentry_patched_create + return _sentry_patched_create_async diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index 5cf0817c87..076defa100 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -20,8 +20,8 @@ from sentry_sdk.tracing import Span try: - from openai.resources.chat.completions import Completions - from openai.resources import Embeddings + from openai.resources.chat.completions import Completions, AsyncCompletions + from openai.resources import Embeddings, AsyncEmbeddings if TYPE_CHECKING: from openai.types.chat import ChatCompletionMessageParam, ChatCompletionChunk @@ -47,7 +47,9 @@ def __init__(self, include_prompts=True, tiktoken_encoding_name=None): def setup_once(): # type: () -> None Completions.create = _wrap_chat_completion_create(Completions.create) + AsyncCompletions.create = _wrap_chat_completion_create(AsyncCompletions.create) Embeddings.create = _wrap_embeddings_create(Embeddings.create) + AsyncEmbeddings.create = _wrap_embeddings_create(AsyncEmbeddings.create) def count_tokens(self, s): # type: (OpenAIIntegration, str) -> int @@ -110,159 +112,187 @@ def _calculate_chat_completion_usage( record_token_usage(span, prompt_tokens, completion_tokens, total_tokens) +def _new_chat_completion_common(f, *args, **kwargs): + # type: (*Any, **Any) -> Any + if "messages" not in kwargs: + # invalid call (in all versions of openai), let it return error + return f(*args, **kwargs) + + try: + iter(kwargs["messages"]) + except TypeError: + # invalid call (in all versions), messages must be iterable + return f(*args, **kwargs) + + kwargs["messages"] = list(kwargs["messages"]) + messages = kwargs["messages"] + model = kwargs.get("model") + streaming = kwargs.get("stream") + + span = sentry_sdk.start_span( + op=consts.OP.OPENAI_CHAT_COMPLETIONS_CREATE, + description="Chat Completion", + origin=OpenAIIntegration.origin, + ) + span.__enter__() + try: + res = f(*args, **kwargs) + except Exception as e: + _capture_exception(e) + span.__exit__(None, None, None) + raise e from None + + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + + with capture_internal_exceptions(): + if should_send_default_pii() and integration.include_prompts: + set_data_normalized(span, SPANDATA.AI_INPUT_MESSAGES, messages) + + set_data_normalized(span, SPANDATA.AI_MODEL_ID, model) + set_data_normalized(span, SPANDATA.AI_STREAMING, streaming) + + if hasattr(res, "choices"): + if should_send_default_pii() and integration.include_prompts: + set_data_normalized( + span, + "ai.responses", + list(map(lambda x: x.message, res.choices)), + ) + _calculate_chat_completion_usage( + messages, res, span, None, integration.count_tokens + ) + span.__exit__(None, None, None) + elif hasattr(res, "_iterator"): + data_buf: list[list[str]] = [] # one for each choice + + old_iterator = res._iterator # type: Iterator[ChatCompletionChunk] + + def new_iterator(): + # type: () -> Iterator[ChatCompletionChunk] + with capture_internal_exceptions(): + for x in old_iterator: + if hasattr(x, "choices"): + choice_index = 0 + for choice in x.choices: + if hasattr(choice, "delta") and hasattr( + choice.delta, "content" + ): + content = choice.delta.content + if len(data_buf) <= choice_index: + data_buf.append([]) + data_buf[choice_index].append(content or "") + choice_index += 1 + yield x + if len(data_buf) > 0: + all_responses = list( + map(lambda chunk: "".join(chunk), data_buf) + ) + if should_send_default_pii() and integration.include_prompts: + set_data_normalized( + span, SPANDATA.AI_RESPONSES, all_responses + ) + _calculate_chat_completion_usage( + messages, + res, + span, + all_responses, + integration.count_tokens, + ) + span.__exit__(None, None, None) + + res._iterator = new_iterator() + else: + set_data_normalized(span, "unknown_response", True) + span.__exit__(None, None, None) + return res + + def _wrap_chat_completion_create(f): # type: (Callable[..., Any]) -> Callable[..., Any] + @wraps(f) + @ensure_integration_enabled(OpenAIIntegration, f) + def _sentry_patched_create_sync(*args, **kwargs): + # type: (*Any, **Any) -> Any + return _new_chat_completion_common(f, *args, **kwargs) + + return _sentry_patched_create_sync + +def _wrap_async_chat_completion_create(f): + # type: (Callable[..., Any]) -> Callable[..., Any] + @wraps(f) @ensure_integration_enabled(OpenAIIntegration, f) - def new_chat_completion(*args, **kwargs): + async def _sentry_patched_create_async(*args, **kwargs): # type: (*Any, **Any) -> Any - if "messages" not in kwargs: - # invalid call (in all versions of openai), let it return error - return f(*args, **kwargs) + return await _new_chat_completion_common(f, *args, **kwargs) + return _sentry_patched_create_async + + +def _new_embeddings_create_common(f, *args, **kwargs): + # type: (*Any, **Any) -> Any + with sentry_sdk.start_span( + op=consts.OP.OPENAI_EMBEDDINGS_CREATE, + description="OpenAI Embedding Creation", + origin=OpenAIIntegration.origin, + ) as span: + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if "input" in kwargs and ( + should_send_default_pii() and integration.include_prompts + ): + if isinstance(kwargs["input"], str): + set_data_normalized(span, "ai.input_messages", [kwargs["input"]]) + elif ( + isinstance(kwargs["input"], list) + and len(kwargs["input"]) > 0 + and isinstance(kwargs["input"][0], str) + ): + set_data_normalized(span, "ai.input_messages", kwargs["input"]) + if "model" in kwargs: + set_data_normalized(span, "ai.model_id", kwargs["model"]) try: - iter(kwargs["messages"]) - except TypeError: - # invalid call (in all versions), messages must be iterable - return f(*args, **kwargs) - - kwargs["messages"] = list(kwargs["messages"]) - messages = kwargs["messages"] - model = kwargs.get("model") - streaming = kwargs.get("stream") - - span = sentry_sdk.start_span( - op=consts.OP.OPENAI_CHAT_COMPLETIONS_CREATE, - description="Chat Completion", - origin=OpenAIIntegration.origin, - ) - span.__enter__() - try: - res = f(*args, **kwargs) + response = f(*args, **kwargs) except Exception as e: _capture_exception(e) - span.__exit__(None, None, None) raise e from None - integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + prompt_tokens = 0 + total_tokens = 0 + if hasattr(response, "usage"): + if hasattr(response.usage, "prompt_tokens") and isinstance( + response.usage.prompt_tokens, int + ): + prompt_tokens = response.usage.prompt_tokens + if hasattr(response.usage, "total_tokens") and isinstance( + response.usage.total_tokens, int + ): + total_tokens = response.usage.total_tokens - with capture_internal_exceptions(): - if should_send_default_pii() and integration.include_prompts: - set_data_normalized(span, SPANDATA.AI_INPUT_MESSAGES, messages) - - set_data_normalized(span, SPANDATA.AI_MODEL_ID, model) - set_data_normalized(span, SPANDATA.AI_STREAMING, streaming) - - if hasattr(res, "choices"): - if should_send_default_pii() and integration.include_prompts: - set_data_normalized( - span, - "ai.responses", - list(map(lambda x: x.message, res.choices)), - ) - _calculate_chat_completion_usage( - messages, res, span, None, integration.count_tokens - ) - span.__exit__(None, None, None) - elif hasattr(res, "_iterator"): - data_buf: list[list[str]] = [] # one for each choice - - old_iterator = res._iterator # type: Iterator[ChatCompletionChunk] - - def new_iterator(): - # type: () -> Iterator[ChatCompletionChunk] - with capture_internal_exceptions(): - for x in old_iterator: - if hasattr(x, "choices"): - choice_index = 0 - for choice in x.choices: - if hasattr(choice, "delta") and hasattr( - choice.delta, "content" - ): - content = choice.delta.content - if len(data_buf) <= choice_index: - data_buf.append([]) - data_buf[choice_index].append(content or "") - choice_index += 1 - yield x - if len(data_buf) > 0: - all_responses = list( - map(lambda chunk: "".join(chunk), data_buf) - ) - if ( - should_send_default_pii() - and integration.include_prompts - ): - set_data_normalized( - span, SPANDATA.AI_RESPONSES, all_responses - ) - _calculate_chat_completion_usage( - messages, - res, - span, - all_responses, - integration.count_tokens, - ) - span.__exit__(None, None, None) + if prompt_tokens == 0: + prompt_tokens = integration.count_tokens(kwargs["input"] or "") - res._iterator = new_iterator() - else: - set_data_normalized(span, "unknown_response", True) - span.__exit__(None, None, None) - return res + record_token_usage(span, prompt_tokens, None, total_tokens or prompt_tokens) - return new_chat_completion + return response def _wrap_embeddings_create(f): - # type: (Callable[..., Any]) -> Callable[..., Any] + # type: (Any) -> Any + @wraps(f) + @ensure_integration_enabled(OpenAIIntegration, f) + def _sentry_patched_create_sync(*args, **kwargs): + # type: (*Any, **Any) -> Any + return _new_embeddings_create_common(f, *args, **kwargs) + + return _sentry_patched_create_sync + +def _wrap_async_embeddings_create(f): + # type: (Any) -> Any @wraps(f) @ensure_integration_enabled(OpenAIIntegration, f) - def new_embeddings_create(*args, **kwargs): + async def _sentry_patched_create_async(*args, **kwargs): # type: (*Any, **Any) -> Any - with sentry_sdk.start_span( - op=consts.OP.OPENAI_EMBEDDINGS_CREATE, - description="OpenAI Embedding Creation", - origin=OpenAIIntegration.origin, - ) as span: - integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) - if "input" in kwargs and ( - should_send_default_pii() and integration.include_prompts - ): - if isinstance(kwargs["input"], str): - set_data_normalized(span, "ai.input_messages", [kwargs["input"]]) - elif ( - isinstance(kwargs["input"], list) - and len(kwargs["input"]) > 0 - and isinstance(kwargs["input"][0], str) - ): - set_data_normalized(span, "ai.input_messages", kwargs["input"]) - if "model" in kwargs: - set_data_normalized(span, "ai.model_id", kwargs["model"]) - try: - response = f(*args, **kwargs) - except Exception as e: - _capture_exception(e) - raise e from None - - prompt_tokens = 0 - total_tokens = 0 - if hasattr(response, "usage"): - if hasattr(response.usage, "prompt_tokens") and isinstance( - response.usage.prompt_tokens, int - ): - prompt_tokens = response.usage.prompt_tokens - if hasattr(response.usage, "total_tokens") and isinstance( - response.usage.total_tokens, int - ): - total_tokens = response.usage.total_tokens - - if prompt_tokens == 0: - prompt_tokens = integration.count_tokens(kwargs["input"] or "") - - record_token_usage(span, prompt_tokens, None, total_tokens or prompt_tokens) - - return response - - return new_embeddings_create + return await _new_embeddings_create_common(f, *args, **kwargs) + + return _sentry_patched_create_async From 397968234d9f2f884e0fb0b27042368b80c0626b Mon Sep 17 00:00:00 2001 From: Matej Vetrak Date: Wed, 4 Sep 2024 15:10:50 +0200 Subject: [PATCH 02/25] Fix types --- sentry_sdk/integrations/openai.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index 076defa100..a6debf17d4 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -113,7 +113,7 @@ def _calculate_chat_completion_usage( def _new_chat_completion_common(f, *args, **kwargs): - # type: (*Any, **Any) -> Any + # type: (Any, *Any, **Any) -> Any if "messages" not in kwargs: # invalid call (in all versions of openai), let it return error return f(*args, **kwargs) @@ -230,7 +230,7 @@ async def _sentry_patched_create_async(*args, **kwargs): def _new_embeddings_create_common(f, *args, **kwargs): - # type: (*Any, **Any) -> Any + # type: (Any, *Any, **Any) -> Any with sentry_sdk.start_span( op=consts.OP.OPENAI_EMBEDDINGS_CREATE, description="OpenAI Embedding Creation", From 99387d524d8713a0860d72048a013c95371fa7e2 Mon Sep 17 00:00:00 2001 From: Matej Vetrak Date: Wed, 4 Sep 2024 15:54:26 +0200 Subject: [PATCH 03/25] Add exec_sync and exec_async to correctly handle function execution in patched method --- sentry_sdk/integrations/anthropic.py | 19 +++++++++++--- sentry_sdk/integrations/openai.py | 38 +++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 5c4c24a51b..597c50d301 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -96,7 +96,7 @@ def _sentry_patched_create_common(f, *args, **kwargs): span.__enter__() try: - result = f(*args, **kwargs) + result = yield f(*args, **kwargs) except Exception as exc: _capture_exception(exc) span.__exit__(None, None, None) @@ -173,21 +173,34 @@ def new_iterator(): def _wrap_message_create(f): # type: (Any) -> Any + def _execute_sync(f, *args, **kwargs): + # type: (Any, *Any, **Any) -> Any + gen = _sentry_patched_create_common(f, *args, **kwargs) + f, args, kwargs = next(gen) + return gen.send(f(*args, **kwargs)) + @wraps(f) @ensure_integration_enabled(AnthropicIntegration, f) def _sentry_patched_create_sync(*args, **kwargs): # type: (*Any, **Any) -> Any - return _sentry_patched_create_common(f, *args, **kwargs) + return _execute_sync(f, *args, **kwargs) return _sentry_patched_create_sync def _wrap_async_message_create(f): # type: (Any) -> Any + + async def _execute_async(f, *args, **kwargs): + # type: (Any, *Any, **Any) -> Any + gen = _sentry_patched_create_common(f, *args, **kwargs) + f, args, kwargs = next(gen) + return gen.send(await f(*args, **kwargs)) + @wraps(f) @ensure_integration_enabled(AnthropicIntegration, f) async def _sentry_patched_create_async(*args, **kwargs): # type: (*Any, **Any) -> Any - return await _sentry_patched_create_common(f, *args, **kwargs) + return await _execute_async(f, *args, **kwargs) return _sentry_patched_create_async diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index a6debf17d4..aabc5e3d5a 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -136,7 +136,7 @@ def _new_chat_completion_common(f, *args, **kwargs): ) span.__enter__() try: - res = f(*args, **kwargs) + res = yield f(*args, **kwargs) except Exception as e: _capture_exception(e) span.__exit__(None, None, None) @@ -209,22 +209,35 @@ def new_iterator(): def _wrap_chat_completion_create(f): # type: (Callable[..., Any]) -> Callable[..., Any] + def _execute_sync(f, *args, **kwargs): + # type: (Any, *Any, **Any) -> Any + gen = _new_chat_completion_common(f, *args, **kwargs) + f, args, kwargs = next(gen) + return gen.send(f(*args, **kwargs)) + @wraps(f) @ensure_integration_enabled(OpenAIIntegration, f) def _sentry_patched_create_sync(*args, **kwargs): # type: (*Any, **Any) -> Any - return _new_chat_completion_common(f, *args, **kwargs) + return _execute_sync(f, *args, **kwargs) return _sentry_patched_create_sync def _wrap_async_chat_completion_create(f): # type: (Callable[..., Any]) -> Callable[..., Any] + + async def _execute_async(f, *args, **kwargs): + # type: (Any, *Any, **Any) -> Any + gen = _new_chat_completion_common(f, *args, **kwargs) + f, args, kwargs = next(gen) + return gen.send(await f(*args, **kwargs)) + @wraps(f) @ensure_integration_enabled(OpenAIIntegration, f) async def _sentry_patched_create_async(*args, **kwargs): # type: (*Any, **Any) -> Any - return await _new_chat_completion_common(f, *args, **kwargs) + return await _execute_async(f, *args, **kwargs) return _sentry_patched_create_async @@ -251,7 +264,7 @@ def _new_embeddings_create_common(f, *args, **kwargs): if "model" in kwargs: set_data_normalized(span, "ai.model_id", kwargs["model"]) try: - response = f(*args, **kwargs) + response = yield f(*args, **kwargs) except Exception as e: _capture_exception(e) raise e from None @@ -278,21 +291,34 @@ def _new_embeddings_create_common(f, *args, **kwargs): def _wrap_embeddings_create(f): # type: (Any) -> Any + + def _execute_sync(f, *args, **kwargs): + # type: (Any, *Any, **Any) -> Any + gen = _new_embeddings_create_common(f, *args, **kwargs) + f, args, kwargs = next(gen) + return gen.send(f(*args, **kwargs)) + @wraps(f) @ensure_integration_enabled(OpenAIIntegration, f) def _sentry_patched_create_sync(*args, **kwargs): # type: (*Any, **Any) -> Any - return _new_embeddings_create_common(f, *args, **kwargs) + return _execute_sync(f, *args, **kwargs) return _sentry_patched_create_sync def _wrap_async_embeddings_create(f): # type: (Any) -> Any + async def _execute_async(f, *args, **kwargs): + # type: (Any, *Any, **Any) -> Any + gen = _new_embeddings_create_common(f, *args, **kwargs) + f, args, kwargs = next(gen) + return gen.send(await f(*args, **kwargs)) + @wraps(f) @ensure_integration_enabled(OpenAIIntegration, f) async def _sentry_patched_create_async(*args, **kwargs): # type: (*Any, **Any) -> Any - return await _new_embeddings_create_common(f, *args, **kwargs) + return await _execute_async(f, *args, **kwargs) return _sentry_patched_create_async From cd71e105942bb92b226c0244519c0befff5843a6 Mon Sep 17 00:00:00 2001 From: Matej Vetrak Date: Wed, 4 Sep 2024 15:59:18 +0200 Subject: [PATCH 04/25] Correctly yield f, args, kwargs --- sentry_sdk/integrations/anthropic.py | 2 +- sentry_sdk/integrations/openai.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 597c50d301..2595e8d764 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -96,7 +96,7 @@ def _sentry_patched_create_common(f, *args, **kwargs): span.__enter__() try: - result = yield f(*args, **kwargs) + result = yield f, args, kwargs except Exception as exc: _capture_exception(exc) span.__exit__(None, None, None) diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index aabc5e3d5a..2903028471 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -136,7 +136,7 @@ def _new_chat_completion_common(f, *args, **kwargs): ) span.__enter__() try: - res = yield f(*args, **kwargs) + res = yield f, args, kwargs except Exception as e: _capture_exception(e) span.__exit__(None, None, None) @@ -264,7 +264,7 @@ def _new_embeddings_create_common(f, *args, **kwargs): if "model" in kwargs: set_data_normalized(span, "ai.model_id", kwargs["model"]) try: - response = yield f(*args, **kwargs) + response = yield f, args, kwargs except Exception as e: _capture_exception(e) raise e from None From a8a21f8f058ccac1011acc71665cba3997bcf4ea Mon Sep 17 00:00:00 2001 From: Matej Vetrak Date: Wed, 4 Sep 2024 16:40:18 +0200 Subject: [PATCH 05/25] Handle generators correctly --- sentry_sdk/integrations/anthropic.py | 26 +++++++++++-- sentry_sdk/integrations/openai.py | 58 +++++++++++++++++++++++----- 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 2595e8d764..6c2282fb72 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -176,8 +176,17 @@ def _wrap_message_create(f): def _execute_sync(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any gen = _sentry_patched_create_common(f, *args, **kwargs) - f, args, kwargs = next(gen) - return gen.send(f(*args, **kwargs)) + + try: + f, args, kwargs = next(gen) + except StopIteration as e: + return e.value + + try: + result = f(*args, **kwargs) + return gen.send(result) + except StopIteration as e: + return e.value @wraps(f) @ensure_integration_enabled(AnthropicIntegration, f) @@ -194,8 +203,17 @@ def _wrap_async_message_create(f): async def _execute_async(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any gen = _sentry_patched_create_common(f, *args, **kwargs) - f, args, kwargs = next(gen) - return gen.send(await f(*args, **kwargs)) + + try: + f, args, kwargs = next(gen) + except StopIteration as e: + return await e.value + + try: + result = await f(*args, **kwargs) + return gen.send(result) + except StopIteration as e: + return e.value @wraps(f) @ensure_integration_enabled(AnthropicIntegration, f) diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index 2903028471..ae060f2665 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -47,9 +47,11 @@ def __init__(self, include_prompts=True, tiktoken_encoding_name=None): def setup_once(): # type: () -> None Completions.create = _wrap_chat_completion_create(Completions.create) - AsyncCompletions.create = _wrap_chat_completion_create(AsyncCompletions.create) + AsyncCompletions.create = _wrap_async_chat_completion_create( + AsyncCompletions.create + ) Embeddings.create = _wrap_embeddings_create(Embeddings.create) - AsyncEmbeddings.create = _wrap_embeddings_create(AsyncEmbeddings.create) + AsyncEmbeddings.create = _wrap_async_embeddings_create(AsyncEmbeddings.create) def count_tokens(self, s): # type: (OpenAIIntegration, str) -> int @@ -212,8 +214,17 @@ def _wrap_chat_completion_create(f): def _execute_sync(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any gen = _new_chat_completion_common(f, *args, **kwargs) - f, args, kwargs = next(gen) - return gen.send(f(*args, **kwargs)) + + try: + f, args, kwargs = next(gen) + except StopIteration as e: + return e.value + + try: + result = f(*args, **kwargs) + return gen.send(result) + except StopIteration as e: + return e.value @wraps(f) @ensure_integration_enabled(OpenAIIntegration, f) @@ -230,8 +241,17 @@ def _wrap_async_chat_completion_create(f): async def _execute_async(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any gen = _new_chat_completion_common(f, *args, **kwargs) - f, args, kwargs = next(gen) - return gen.send(await f(*args, **kwargs)) + + try: + f, args, kwargs = next(gen) + except StopIteration as e: + return await e.value + + try: + result = await f(*args, **kwargs) + return gen.send(result) + except StopIteration as e: + return e.value @wraps(f) @ensure_integration_enabled(OpenAIIntegration, f) @@ -295,8 +315,17 @@ def _wrap_embeddings_create(f): def _execute_sync(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any gen = _new_embeddings_create_common(f, *args, **kwargs) - f, args, kwargs = next(gen) - return gen.send(f(*args, **kwargs)) + + try: + f, args, kwargs = next(gen) + except StopIteration as e: + return e.value + + try: + result = f(*args, **kwargs) + return gen.send(result) + except StopIteration as e: + return e.value @wraps(f) @ensure_integration_enabled(OpenAIIntegration, f) @@ -312,8 +341,17 @@ def _wrap_async_embeddings_create(f): async def _execute_async(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any gen = _new_embeddings_create_common(f, *args, **kwargs) - f, args, kwargs = next(gen) - return gen.send(await f(*args, **kwargs)) + + try: + f, args, kwargs = next(gen) + except StopIteration as e: + return await e.value + + try: + result = await f(*args, **kwargs) + return gen.send(result) + except StopIteration as e: + return e.value @wraps(f) @ensure_integration_enabled(OpenAIIntegration, f) From ea63aaaba14dce697c54b7e1d50772d4bca94575 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Mon, 14 Oct 2024 15:37:32 +0200 Subject: [PATCH 06/25] Removed the ensure_integration_enabled --- sentry_sdk/integrations/anthropic.py | 12 +++++++----- sentry_sdk/integrations/openai.py | 24 +++++++++++++++++++----- tox.ini | 2 +- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index b761f6a571..974d20033f 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -191,14 +191,17 @@ def _execute_sync(f, *args, **kwargs): return e.value @wraps(f) - @ensure_integration_enabled(AnthropicIntegration, f) def _sentry_patched_create_sync(*args, **kwargs): # type: (*Any, **Any) -> Any + integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) + if integration is None or "messages" not in kwargs: + return f(*args, **kwargs) + return _execute_sync(f, *args, **kwargs) return _sentry_patched_create_sync - + def _wrap_async_message_create(f): # type: (Any) -> Any async def _execute_async(f, *args, **kwargs): @@ -220,10 +223,9 @@ async def _execute_async(f, *args, **kwargs): async def _sentry_patched_create_async(*args, **kwargs): # type: (*Any, **Any) -> Any integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) - if integration is None or "messages" not in kwargs: - return f(*args, **kwargs) - + return await f(*args, **kwargs) + return await _execute_async(f, *args, **kwargs) return _sentry_patched_create_async diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index f0cf793988..7968dcb63d 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -227,9 +227,13 @@ def _execute_sync(f, *args, **kwargs): return e.value @wraps(f) - @ensure_integration_enabled(OpenAIIntegration, f) def _sentry_patched_create_sync(*args, **kwargs): # type: (*Any, **Any) -> Any + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if integration is None or "messages" not in kwargs: + # no "messages" means invalid call (in all versions of openai), let it return error + return f(*args, **kwargs) + return _execute_sync(f, *args, **kwargs) return _sentry_patched_create_sync @@ -253,9 +257,13 @@ async def _execute_async(f, *args, **kwargs): return e.value @wraps(f) - @ensure_integration_enabled(OpenAIIntegration, f) async def _sentry_patched_create_async(*args, **kwargs): # type: (*Any, **Any) -> Any + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if integration is None or "messages" not in kwargs: + # no "messages" means invalid call (in all versions of openai), let it return error + return await f(*args, **kwargs) + return await _execute_async(f, *args, **kwargs) return _sentry_patched_create_async @@ -327,9 +335,12 @@ def _execute_sync(f, *args, **kwargs): return e.value @wraps(f) - @ensure_integration_enabled(OpenAIIntegration, f) def _sentry_patched_create_sync(*args, **kwargs): # type: (*Any, **Any) -> Any + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if integration is None: + return f(*args, **kwargs) + return _execute_sync(f, *args, **kwargs) return _sentry_patched_create_sync @@ -353,9 +364,12 @@ async def _execute_async(f, *args, **kwargs): return e.value @wraps(f) - @ensure_integration_enabled(OpenAIIntegration, f) async def _sentry_patched_create_async(*args, **kwargs): # type: (*Any, **Any) -> Any + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if integration is None: + return await f(*args, **kwargs) + return await _execute_async(f, *args, **kwargs) - return _sentry_patched_create_async \ No newline at end of file + return _sentry_patched_create_async diff --git a/tox.ini b/tox.ini index 8d54a0364b..b4205516b8 100644 --- a/tox.ini +++ b/tox.ini @@ -792,5 +792,5 @@ commands = [testenv:linters] commands = flake8 tests sentry_sdk - black --check tests sentry_sdk + black tests sentry_sdk mypy sentry_sdk From ed61be4276b443bf3d07513c2ffdf4f114d04255 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Mon, 14 Oct 2024 15:54:36 +0200 Subject: [PATCH 07/25] Make mypy happy --- sentry_sdk/integrations/anthropic.py | 6 ++++-- sentry_sdk/integrations/openai.py | 11 ++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 974d20033f..6f38cf0611 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -90,6 +90,10 @@ def _get_responses(content): def _sentry_patched_create_common(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any + integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) + if integration is None: + return f(*args, **kwargs) + if "messages" not in kwargs: return f(*args, **kwargs) @@ -115,8 +119,6 @@ def _sentry_patched_create_common(f, *args, **kwargs): span.__exit__(None, None, None) raise exc from None - integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) - with capture_internal_exceptions(): span.set_data(SPANDATA.AI_MODEL_ID, model) span.set_data(SPANDATA.AI_STREAMING, False) diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index 7968dcb63d..9c40caa79d 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -116,6 +116,10 @@ def _calculate_chat_completion_usage( def _new_chat_completion_common(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if integration is None: + return f(*args, **kwargs) + if "messages" not in kwargs: # invalid call (in all versions of openai), let it return error return f(*args, **kwargs) @@ -144,8 +148,6 @@ def _new_chat_completion_common(f, *args, **kwargs): span.__exit__(None, None, None) raise e from None - integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) - with capture_internal_exceptions(): if should_send_default_pii() and integration.include_prompts: set_data_normalized(span, SPANDATA.AI_INPUT_MESSAGES, messages) @@ -271,12 +273,15 @@ async def _sentry_patched_create_async(*args, **kwargs): def _new_embeddings_create_common(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any + integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) + if integration is None: + return f(*args, **kwargs) + with sentry_sdk.start_span( op=consts.OP.OPENAI_EMBEDDINGS_CREATE, description="OpenAI Embedding Creation", origin=OpenAIIntegration.origin, ) as span: - integration = sentry_sdk.get_client().get_integration(OpenAIIntegration) if "input" in kwargs and ( should_send_default_pii() and integration.include_prompts ): From d3b48bf9dfa8914044b4d6b318273732385c1938 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Mon, 14 Oct 2024 15:56:43 +0200 Subject: [PATCH 08/25] Fix from other contributor pr --- sentry_sdk/integrations/anthropic.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 6f38cf0611..ad8de107ad 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -147,7 +147,8 @@ def new_iterator(): elif event.type == "content_block_start": pass elif event.type == "content_block_delta": - content_blocks.append(event.delta.text) + if hasattr(event.delta, "text"): + content_blocks.append(event.delta.text) elif event.type == "content_block_stop": pass elif event.type == "message_delta": From f837fab1fd7e1bac86dc8f027b74efe25b88755d Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Mon, 14 Oct 2024 15:57:20 +0200 Subject: [PATCH 09/25] whoops --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index b4205516b8..8d54a0364b 100644 --- a/tox.ini +++ b/tox.ini @@ -792,5 +792,5 @@ commands = [testenv:linters] commands = flake8 tests sentry_sdk - black tests sentry_sdk + black --check tests sentry_sdk mypy sentry_sdk From 708adfdd866b53b464a253e6530411a224dab77d Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 10:03:01 +0200 Subject: [PATCH 10/25] Make it work with async streaming responses --- sentry_sdk/integrations/anthropic.py | 57 ++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index ad8de107ad..732c765efe 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -90,6 +90,8 @@ def _get_responses(content): def _sentry_patched_create_common(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any + + # check requirements integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) if integration is None: return f(*args, **kwargs) @@ -102,9 +104,7 @@ def _sentry_patched_create_common(f, *args, **kwargs): except TypeError: return f(*args, **kwargs) - messages = list(kwargs["messages"]) - model = kwargs.get("model") - + # start span span = sentry_sdk.start_span( op=OP.ANTHROPIC_MESSAGES_CREATE, description="Anthropic messages create", @@ -112,6 +112,7 @@ def _sentry_patched_create_common(f, *args, **kwargs): ) span.__enter__() + # yield generator try: result = yield f, args, kwargs except Exception as exc: @@ -119,19 +120,63 @@ def _sentry_patched_create_common(f, *args, **kwargs): span.__exit__(None, None, None) raise exc from None + # add data to span and finish it + messages = list(kwargs["messages"]) + model = kwargs.get("model") + with capture_internal_exceptions(): span.set_data(SPANDATA.AI_MODEL_ID, model) span.set_data(SPANDATA.AI_STREAMING, False) + if should_send_default_pii() and integration.include_prompts: span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages) + if hasattr(result, "content"): if should_send_default_pii() and integration.include_prompts: span.set_data(SPANDATA.AI_RESPONSES, _get_responses(result.content)) _calculate_token_usage(result, span) span.__exit__(None, None, None) + elif hasattr(result, "_iterator"): old_iterator = result._iterator + async def async_new_iterator(): + # type: () -> Iterator[MessageStreamEvent] + input_tokens = 0 + output_tokens = 0 + content_blocks = [] + with capture_internal_exceptions(): + async for event in old_iterator: + if hasattr(event, "type"): + if event.type == "message_start": + usage = event.message.usage + input_tokens += usage.input_tokens + output_tokens += usage.output_tokens + elif event.type == "content_block_start": + pass + elif event.type == "content_block_delta": + if hasattr(event.delta, "text"): + content_blocks.append(event.delta.text) + elif event.type == "content_block_stop": + pass + elif event.type == "message_delta": + output_tokens += event.usage.output_tokens + elif event.type == "message_stop": + continue + + yield event + + if should_send_default_pii() and integration.include_prompts: + complete_message = "".join(content_blocks) + span.set_data( + SPANDATA.AI_RESPONSES, + [{"type": "text", "text": complete_message}], + ) + total_tokens = input_tokens + output_tokens + record_token_usage(span, input_tokens, output_tokens, total_tokens) + span.set_data(SPANDATA.AI_STREAMING, True) + span.__exit__(None, None, None) + def new_iterator(): # type: () -> Iterator[MessageStreamEvent] input_tokens = 0 @@ -168,7 +213,11 @@ def new_iterator(): span.set_data(SPANDATA.AI_STREAMING, True) span.__exit__(None, None, None) - result._iterator = new_iterator() + if str(type(result._iterator)) == "": + result._iterator = async_new_iterator() + else: + result._iterator = new_iterator() + else: span.set_data("unknown_response", True) span.__exit__(None, None, None) From 523bc0f5f1a46bb19a17ba40f3adfccfd6e4088e Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 10:08:11 +0200 Subject: [PATCH 11/25] Renamed for better readability --- sentry_sdk/integrations/anthropic.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 732c765efe..f8ecd16a9e 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -45,7 +45,7 @@ def setup_once(): raise DidNotEnable("anthropic 0.16 or newer required.") Messages.create = _wrap_message_create(Messages.create) - AsyncMessages.create = _wrap_async_message_create(AsyncMessages.create) + AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) def _capture_exception(exc): @@ -254,7 +254,7 @@ def _sentry_patched_create_sync(*args, **kwargs): return _sentry_patched_create_sync -def _wrap_async_message_create(f): +def _wrap_message_create_async(f): # type: (Any) -> Any async def _execute_async(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any From a175ba16883eab57ee14ad164a24c16edfae7ed1 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 10:09:48 +0200 Subject: [PATCH 12/25] Renamed for consistency --- sentry_sdk/integrations/anthropic.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index f8ecd16a9e..e6aa22b69f 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -21,7 +21,7 @@ raise DidNotEnable("Anthropic not installed") if TYPE_CHECKING: - from typing import Any, Iterator + from typing import Any, Iterator, AsyncIterator from sentry_sdk.tracing import Span @@ -140,8 +140,8 @@ def _sentry_patched_create_common(f, *args, **kwargs): elif hasattr(result, "_iterator"): old_iterator = result._iterator - async def async_new_iterator(): - # type: () -> Iterator[MessageStreamEvent] + async def new_iterator_async(): + # type: () -> AsyncIterator[MessageStreamEvent] input_tokens = 0 output_tokens = 0 content_blocks = [] @@ -214,7 +214,7 @@ def new_iterator(): span.__exit__(None, None, None) if str(type(result._iterator)) == "": - result._iterator = async_new_iterator() + result._iterator = new_iterator_async() else: result._iterator = new_iterator() From 6c1c97dc3a7863ec4662b1abb606389d2beb5d8b Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 11:17:20 +0200 Subject: [PATCH 13/25] Moved duplicated code into functions --- sentry_sdk/integrations/anthropic.py | 132 +++++++++++++-------------- 1 file changed, 63 insertions(+), 69 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index e6aa22b69f..3d333a2ae8 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -75,7 +75,9 @@ def _calculate_token_usage(result, span): def _get_responses(content): # type: (list[Any]) -> list[dict[str, Any]] - """Get JSON of a Anthropic responses.""" + """ + Get JSON of a Anthropic responses. + """ responses = [] for item in content: if hasattr(item, "text"): @@ -88,11 +90,48 @@ def _get_responses(content): return responses +def _collect_ai_data(event, input_tokens, output_tokens, content_blocks): + """ + Count token usage and collect content blocks from the AI stream. + """ + with capture_internal_exceptions(): + if hasattr(event, "type"): + if event.type == "message_start": + usage = event.message.usage + input_tokens += usage.input_tokens + output_tokens += usage.output_tokens + elif event.type == "content_block_start": + pass + elif event.type == "content_block_delta": + if hasattr(event.delta, "text"): + content_blocks.append(event.delta.text) + elif event.type == "content_block_stop": + pass + elif event.type == "message_delta": + output_tokens += event.usage.output_tokens + + +def _add_ai_data_to_span( + span, integration, content_blocks, input_tokens, output_tokens +): + """ + Add token usage and content blocks to the span. + """ + with capture_internal_exceptions(): + if should_send_default_pii() and integration.include_prompts: + complete_message = "".join(content_blocks) + span.set_data( + SPANDATA.AI_RESPONSES, + [{"type": "text", "text": complete_message}], + ) + total_tokens = input_tokens + output_tokens + record_token_usage(span, input_tokens, output_tokens, total_tokens) + span.set_data(SPANDATA.AI_STREAMING, True) + + def _sentry_patched_create_common(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any - - # check requirements - integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) + integration = kwargs.pop("integration") if integration is None: return f(*args, **kwargs) @@ -104,7 +143,6 @@ def _sentry_patched_create_common(f, *args, **kwargs): except TypeError: return f(*args, **kwargs) - # start span span = sentry_sdk.start_span( op=OP.ANTHROPIC_MESSAGES_CREATE, description="Anthropic messages create", @@ -112,7 +150,6 @@ def _sentry_patched_create_common(f, *args, **kwargs): ) span.__enter__() - # yield generator try: result = yield f, args, kwargs except Exception as exc: @@ -130,7 +167,7 @@ def _sentry_patched_create_common(f, *args, **kwargs): if should_send_default_pii() and integration.include_prompts: span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages) - + if hasattr(result, "content"): if should_send_default_pii() and integration.include_prompts: span.set_data(SPANDATA.AI_RESPONSES, _get_responses(result.content)) @@ -140,77 +177,36 @@ def _sentry_patched_create_common(f, *args, **kwargs): elif hasattr(result, "_iterator"): old_iterator = result._iterator - async def new_iterator_async(): - # type: () -> AsyncIterator[MessageStreamEvent] + def new_iterator(): + # type: () -> Iterator[MessageStreamEvent] input_tokens = 0 output_tokens = 0 content_blocks = [] - with capture_internal_exceptions(): - async for event in old_iterator: - if hasattr(event, "type"): - if event.type == "message_start": - usage = event.message.usage - input_tokens += usage.input_tokens - output_tokens += usage.output_tokens - elif event.type == "content_block_start": - pass - elif event.type == "content_block_delta": - if hasattr(event.delta, "text"): - content_blocks.append(event.delta.text) - elif event.type == "content_block_stop": - pass - elif event.type == "message_delta": - output_tokens += event.usage.output_tokens - elif event.type == "message_stop": - continue + for event in old_iterator: + _collect_ai_data(event, input_tokens, output_tokens, content_blocks) + if event.type != "message_stop": yield event - if should_send_default_pii() and integration.include_prompts: - complete_message = "".join(content_blocks) - span.set_data( - SPANDATA.AI_RESPONSES, - [{"type": "text", "text": complete_message}], - ) - total_tokens = input_tokens + output_tokens - record_token_usage(span, input_tokens, output_tokens, total_tokens) - span.set_data(SPANDATA.AI_STREAMING, True) + _add_ai_data_to_span( + span, integration, content_blocks, input_tokens, output_tokens + ) span.__exit__(None, None, None) - def new_iterator(): - # type: () -> Iterator[MessageStreamEvent] + async def new_iterator_async(): + # type: () -> AsyncIterator[MessageStreamEvent] input_tokens = 0 output_tokens = 0 content_blocks = [] - with capture_internal_exceptions(): - for event in old_iterator: - if hasattr(event, "type"): - if event.type == "message_start": - usage = event.message.usage - input_tokens += usage.input_tokens - output_tokens += usage.output_tokens - elif event.type == "content_block_start": - pass - elif event.type == "content_block_delta": - if hasattr(event.delta, "text"): - content_blocks.append(event.delta.text) - elif event.type == "content_block_stop": - pass - elif event.type == "message_delta": - output_tokens += event.usage.output_tokens - elif event.type == "message_stop": - continue + + async for event in old_iterator: + _collect_ai_data(event, input_tokens, output_tokens, content_blocks) + if event.type != "message_stop": yield event - if should_send_default_pii() and integration.include_prompts: - complete_message = "".join(content_blocks) - span.set_data( - SPANDATA.AI_RESPONSES, - [{"type": "text", "text": complete_message}], - ) - total_tokens = input_tokens + output_tokens - record_token_usage(span, input_tokens, output_tokens, total_tokens) - span.set_data(SPANDATA.AI_STREAMING, True) + _add_ai_data_to_span( + span, integration, content_blocks, input_tokens, output_tokens + ) span.__exit__(None, None, None) if str(type(result._iterator)) == "": @@ -246,8 +242,7 @@ def _execute_sync(f, *args, **kwargs): def _sentry_patched_create_sync(*args, **kwargs): # type: (*Any, **Any) -> Any integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) - if integration is None or "messages" not in kwargs: - return f(*args, **kwargs) + kwargs["integration"] = integration return _execute_sync(f, *args, **kwargs) @@ -275,8 +270,7 @@ async def _execute_async(f, *args, **kwargs): async def _sentry_patched_create_async(*args, **kwargs): # type: (*Any, **Any) -> Any integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) - if integration is None or "messages" not in kwargs: - return await f(*args, **kwargs) + kwargs["integration"] = integration return await _execute_async(f, *args, **kwargs) From a18e6a5e1275a677b600684685aa96efe1e6475d Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 11:20:51 +0200 Subject: [PATCH 14/25] Better comments --- sentry_sdk/integrations/anthropic.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 3d333a2ae8..d5e3966111 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -92,7 +92,7 @@ def _get_responses(content): def _collect_ai_data(event, input_tokens, output_tokens, content_blocks): """ - Count token usage and collect content blocks from the AI stream. + Count token usage and collect content blocks from the AI streaming response. """ with capture_internal_exceptions(): if hasattr(event, "type"): @@ -115,7 +115,7 @@ def _add_ai_data_to_span( span, integration, content_blocks, input_tokens, output_tokens ): """ - Add token usage and content blocks to the span. + Add token usage and content blocks from the AI streaming response to the span. """ with capture_internal_exceptions(): if should_send_default_pii() and integration.include_prompts: @@ -174,6 +174,7 @@ def _sentry_patched_create_common(f, *args, **kwargs): _calculate_token_usage(result, span) span.__exit__(None, None, None) + # Streaming response elif hasattr(result, "_iterator"): old_iterator = result._iterator From e263c73351aa68296b7a30a57ac4deb434d4c804 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 11:26:00 +0200 Subject: [PATCH 15/25] typing --- sentry_sdk/integrations/anthropic.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index d5e3966111..b5c8553f11 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -13,7 +13,7 @@ ) try: - from anthropic.resources import Messages, AsyncMessages + from anthropic.resources import AsyncMessages, Messages if TYPE_CHECKING: from anthropic.types import MessageStreamEvent @@ -21,7 +21,7 @@ raise DidNotEnable("Anthropic not installed") if TYPE_CHECKING: - from typing import Any, Iterator, AsyncIterator + from typing import Any, AsyncIterator, Iterator from sentry_sdk.tracing import Span @@ -91,6 +91,7 @@ def _get_responses(content): def _collect_ai_data(event, input_tokens, output_tokens, content_blocks): + # type: (MessageStreamEvent, int, int, list[str]) -> None """ Count token usage and collect content blocks from the AI streaming response. """ @@ -112,8 +113,9 @@ def _collect_ai_data(event, input_tokens, output_tokens, content_blocks): def _add_ai_data_to_span( - span, integration, content_blocks, input_tokens, output_tokens + span, integration, input_tokens, output_tokens, content_blocks ): + # type: (Span, AnthropicIntegration, int, int, list[str]) -> None """ Add token usage and content blocks from the AI streaming response to the span. """ @@ -182,7 +184,7 @@ def new_iterator(): # type: () -> Iterator[MessageStreamEvent] input_tokens = 0 output_tokens = 0 - content_blocks = [] + content_blocks = [] # type: list[str] for event in old_iterator: _collect_ai_data(event, input_tokens, output_tokens, content_blocks) @@ -190,7 +192,7 @@ def new_iterator(): yield event _add_ai_data_to_span( - span, integration, content_blocks, input_tokens, output_tokens + span, integration, input_tokens, output_tokens, content_blocks ) span.__exit__(None, None, None) @@ -198,7 +200,7 @@ async def new_iterator_async(): # type: () -> AsyncIterator[MessageStreamEvent] input_tokens = 0 output_tokens = 0 - content_blocks = [] + content_blocks = [] # type: list[str] async for event in old_iterator: _collect_ai_data(event, input_tokens, output_tokens, content_blocks) @@ -206,7 +208,7 @@ async def new_iterator_async(): yield event _add_ai_data_to_span( - span, integration, content_blocks, input_tokens, output_tokens + span, integration, input_tokens, output_tokens, content_blocks ) span.__exit__(None, None, None) From dd21096e5b0f9a2e5afc6b3ef5426f82482fa37b Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 13:05:24 +0200 Subject: [PATCH 16/25] Fixed counting tokens --- sentry_sdk/integrations/anthropic.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index b5c8553f11..31977894b8 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -111,6 +111,7 @@ def _collect_ai_data(event, input_tokens, output_tokens, content_blocks): elif event.type == "message_delta": output_tokens += event.usage.output_tokens + return input_tokens, output_tokens, content_blocks def _add_ai_data_to_span( span, integration, input_tokens, output_tokens, content_blocks @@ -187,7 +188,7 @@ def new_iterator(): content_blocks = [] # type: list[str] for event in old_iterator: - _collect_ai_data(event, input_tokens, output_tokens, content_blocks) + input_tokens, output_tokens, content_blocks = _collect_ai_data(event, input_tokens, output_tokens, content_blocks) if event.type != "message_stop": yield event @@ -203,7 +204,7 @@ async def new_iterator_async(): content_blocks = [] # type: list[str] async for event in old_iterator: - _collect_ai_data(event, input_tokens, output_tokens, content_blocks) + input_tokens, output_tokens, content_blocks = _collect_ai_data(event, input_tokens, output_tokens, content_blocks) if event.type != "message_stop": yield event From 35b372e55f605f7a5a884842bc578a97325f554c Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 13:07:05 +0200 Subject: [PATCH 17/25] linting --- sentry_sdk/integrations/anthropic.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 31977894b8..3a11f57984 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -91,7 +91,7 @@ def _get_responses(content): def _collect_ai_data(event, input_tokens, output_tokens, content_blocks): - # type: (MessageStreamEvent, int, int, list[str]) -> None + # type: (MessageStreamEvent, int, int, list[str]) -> tuple[int, int, list[str]] """ Count token usage and collect content blocks from the AI streaming response. """ @@ -111,7 +111,8 @@ def _collect_ai_data(event, input_tokens, output_tokens, content_blocks): elif event.type == "message_delta": output_tokens += event.usage.output_tokens - return input_tokens, output_tokens, content_blocks + return input_tokens, output_tokens, content_blocks + def _add_ai_data_to_span( span, integration, input_tokens, output_tokens, content_blocks @@ -188,7 +189,9 @@ def new_iterator(): content_blocks = [] # type: list[str] for event in old_iterator: - input_tokens, output_tokens, content_blocks = _collect_ai_data(event, input_tokens, output_tokens, content_blocks) + input_tokens, output_tokens, content_blocks = _collect_ai_data( + event, input_tokens, output_tokens, content_blocks + ) if event.type != "message_stop": yield event @@ -204,7 +207,9 @@ async def new_iterator_async(): content_blocks = [] # type: list[str] async for event in old_iterator: - input_tokens, output_tokens, content_blocks = _collect_ai_data(event, input_tokens, output_tokens, content_blocks) + input_tokens, output_tokens, content_blocks = _collect_ai_data( + event, input_tokens, output_tokens, content_blocks + ) if event.type != "message_stop": yield event From 54177765faf3b4215127ba8029dc050c86f629db Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 13:20:21 +0200 Subject: [PATCH 18/25] Fixed capturing of exceptions --- sentry_sdk/integrations/anthropic.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 3a11f57984..87e69a3113 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -154,12 +154,7 @@ def _sentry_patched_create_common(f, *args, **kwargs): ) span.__enter__() - try: - result = yield f, args, kwargs - except Exception as exc: - _capture_exception(exc) - span.__exit__(None, None, None) - raise exc from None + result = yield f, args, kwargs # add data to span and finish it messages = list(kwargs["messages"]) @@ -242,7 +237,12 @@ def _execute_sync(f, *args, **kwargs): return e.value try: - result = f(*args, **kwargs) + try: + result = f(*args, **kwargs) + except Exception as exc: + _capture_exception(exc) + raise exc from None + return gen.send(result) except StopIteration as e: return e.value @@ -270,7 +270,12 @@ async def _execute_async(f, *args, **kwargs): return await e.value try: - result = await f(*args, **kwargs) + try: + result = await f(*args, **kwargs) + except Exception as exc: + _capture_exception(exc) + raise exc from None + return gen.send(result) except StopIteration as e: return e.value From d8dd55ab12757f1c7a731a0936fdada2f53cc27e Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 14:06:35 +0200 Subject: [PATCH 19/25] Added async test cases --- .../integrations/anthropic/test_anthropic.py | 362 +++++++++++++++++- tox.ini | 1 + 2 files changed, 362 insertions(+), 1 deletion(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 7e33ac831d..61069dc540 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,7 +1,7 @@ from unittest import mock import pytest -from anthropic import Anthropic, AnthropicError, Stream +from anthropic import AsyncAnthropic, Anthropic, AnthropicError, AsyncStream, Stream from anthropic.types import MessageDeltaUsage, TextDelta, Usage from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent from anthropic.types.content_block_start_event import ContentBlockStartEvent @@ -48,6 +48,11 @@ ) +async def async_iterator(values): + for value in values: + yield value + + @pytest.mark.parametrize( "send_default_pii, include_prompts", [ @@ -115,6 +120,74 @@ def test_nonstreaming_create_message( assert span["data"]["ai.streaming"] is False +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +async def test_nonstreaming_create_message_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client = AsyncAnthropic(api_key="z") + client.messages._post = mock.AsyncMock(return_value=EXAMPLE_MESSAGE) + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with start_transaction(name="anthropic"): + response = await client.messages.create( + max_tokens=1024, messages=messages, model="model" + ) + + assert response == EXAMPLE_MESSAGE + usage = response.usage + + assert usage.input_tokens == 10 + assert usage.output_tokens == 20 + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.ANTHROPIC_MESSAGES_CREATE + assert span["description"] == "Anthropic messages create" + assert span["data"][SPANDATA.AI_MODEL_ID] == "model" + + if send_default_pii and include_prompts: + assert span["data"][SPANDATA.AI_INPUT_MESSAGES] == messages + assert span["data"][SPANDATA.AI_RESPONSES] == [ + {"type": "text", "text": "Hi, I'm Claude."} + ] + else: + assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] + assert SPANDATA.AI_RESPONSES not in span["data"] + + assert span["measurements"]["ai_prompt_tokens_used"]["value"] == 10 + assert span["measurements"]["ai_completion_tokens_used"]["value"] == 20 + assert span["measurements"]["ai_total_tokens_used"]["value"] == 30 + assert span["data"]["ai.streaming"] is False + + @pytest.mark.parametrize( "send_default_pii, include_prompts", [ @@ -215,6 +288,109 @@ def test_streaming_create_message( assert span["data"]["ai.streaming"] is True +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +async def test_streaming_create_message_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + client = AsyncAnthropic(api_key="z") + returned_stream = AsyncStream(cast_to=None, response=None, client=client) + returned_stream._iterator = async_iterator( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client.messages._post = mock.AsyncMock(return_value=returned_stream) + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with start_transaction(name="anthropic"): + message = await client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + async for _ in message: + pass + + assert message == returned_stream + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.ANTHROPIC_MESSAGES_CREATE + assert span["description"] == "Anthropic messages create" + assert span["data"][SPANDATA.AI_MODEL_ID] == "model" + + if send_default_pii and include_prompts: + assert span["data"][SPANDATA.AI_INPUT_MESSAGES] == messages + assert span["data"][SPANDATA.AI_RESPONSES] == [ + {"type": "text", "text": "Hi! I'm Claude!"} + ] + + else: + assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] + assert SPANDATA.AI_RESPONSES not in span["data"] + + assert span["measurements"]["ai_prompt_tokens_used"]["value"] == 10 + assert span["measurements"]["ai_completion_tokens_used"]["value"] == 30 + assert span["measurements"]["ai_total_tokens_used"]["value"] == 40 + assert span["data"]["ai.streaming"] is True + + @pytest.mark.skipif( ANTHROPIC_VERSION < (0, 27), reason="Versions <0.27.0 do not include InputJSONDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", @@ -345,6 +521,143 @@ def test_streaming_create_message_with_input_json_delta( assert span["data"]["ai.streaming"] is True +@pytest.mark.asyncio +@pytest.mark.skipif( + ANTHROPIC_VERSION < (0, 27), + reason="Versions <0.27.0 do not include InputJSONDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", +) +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +async def test_streaming_create_message_with_input_json_delta_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + client = AsyncAnthropic(api_key="z") + returned_stream = AsyncStream(cast_to=None, response=None, client=client) + returned_stream._iterator = async_iterator( + [ + MessageStartEvent( + message=Message( + id="msg_0", + content=[], + model="claude-3-5-sonnet-20240620", + role="assistant", + stop_reason=None, + stop_sequence=None, + type="message", + usage=Usage(input_tokens=366, output_tokens=10), + ), + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=ToolUseBlock( + id="toolu_0", input={}, name="get_weather", type="tool_use" + ), + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="{'location':", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json=" 'S", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="an ", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta( + partial_json="Francisco, C", type="input_json_delta" + ), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJSONDelta(partial_json="A'}", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="tool_use", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=41), + type="message_delta", + ), + ] + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client.messages._post = mock.AsyncMock(return_value=returned_stream) + + messages = [ + { + "role": "user", + "content": "What is the weather like in San Francisco?", + } + ] + + with start_transaction(name="anthropic"): + message = await client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + async for _ in message: + pass + + assert message == returned_stream + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.ANTHROPIC_MESSAGES_CREATE + assert span["description"] == "Anthropic messages create" + assert span["data"][SPANDATA.AI_MODEL_ID] == "model" + + if send_default_pii and include_prompts: + assert span["data"][SPANDATA.AI_INPUT_MESSAGES] == messages + assert span["data"][SPANDATA.AI_RESPONSES] == [ + {"text": "", "type": "text"} + ] # we do not record InputJSONDelta because it could contain PII + + else: + assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] + assert SPANDATA.AI_RESPONSES not in span["data"] + + assert span["measurements"]["ai_prompt_tokens_used"]["value"] == 366 + assert span["measurements"]["ai_completion_tokens_used"]["value"] == 51 + assert span["measurements"]["ai_total_tokens_used"]["value"] == 417 + assert span["data"]["ai.streaming"] is True + + def test_exception_message_create(sentry_init, capture_events): sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) events = capture_events() @@ -364,6 +677,26 @@ def test_exception_message_create(sentry_init, capture_events): assert event["level"] == "error" +@pytest.mark.asyncio +async def test_exception_message_create_async(sentry_init, capture_events): + sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) + events = capture_events() + + client = AsyncAnthropic(api_key="z") + client.messages._post = mock.AsyncMock( + side_effect=AnthropicError("API rate limit reached") + ) + with pytest.raises(AnthropicError): + await client.messages.create( + model="some-model", + messages=[{"role": "system", "content": "I'm throwing an exception"}], + max_tokens=1024, + ) + + (event,) = events + assert event["level"] == "error" + + def test_span_origin(sentry_init, capture_events): sentry_init( integrations=[AnthropicIntegration()], @@ -388,3 +721,30 @@ def test_span_origin(sentry_init, capture_events): assert event["contexts"]["trace"]["origin"] == "manual" assert event["spans"][0]["origin"] == "auto.ai.anthropic" + + +@pytest.mark.asyncio +async def test_span_origin_async(sentry_init, capture_events): + sentry_init( + integrations=[AnthropicIntegration()], + traces_sample_rate=1.0, + ) + events = capture_events() + + client = AsyncAnthropic(api_key="z") + client.messages._post = mock.AsyncMock(return_value=EXAMPLE_MESSAGE) + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with start_transaction(name="anthropic"): + await client.messages.create(max_tokens=1024, messages=messages, model="model") + + (event,) = events + + assert event["contexts"]["trace"]["origin"] == "manual" + assert event["spans"][0]["origin"] == "auto.ai.anthropic" diff --git a/tox.ini b/tox.ini index 8d54a0364b..1ad4fa23ec 100644 --- a/tox.ini +++ b/tox.ini @@ -316,6 +316,7 @@ deps = aiohttp-latest: pytest-asyncio # Anthropic + anthropic: pytest-asyncio anthropic-v0.25: anthropic~=0.25.0 anthropic-v0.16: anthropic~=0.16.0 anthropic-latest: anthropic From 0ec5fe831adad70bf68654ef707e0a29e8a37ed5 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 14:20:10 +0200 Subject: [PATCH 20/25] Mack async work in python 3.7 --- .../integrations/anthropic/test_anthropic.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 61069dc540..8ce12e70f5 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,5 +1,14 @@ from unittest import mock +try: + from unittest.mock import AsyncMock +except ImportError: + + class AsyncMock(mock.MagicMock): + async def __call__(self, *args, **kwargs): + return super(AsyncMock, self).__call__(*args, **kwargs) + + import pytest from anthropic import AsyncAnthropic, Anthropic, AnthropicError, AsyncStream, Stream from anthropic.types import MessageDeltaUsage, TextDelta, Usage @@ -140,7 +149,7 @@ async def test_nonstreaming_create_message_async( ) events = capture_events() client = AsyncAnthropic(api_key="z") - client.messages._post = mock.AsyncMock(return_value=EXAMPLE_MESSAGE) + client.messages._post = AsyncMock(return_value=EXAMPLE_MESSAGE) messages = [ { @@ -344,7 +353,7 @@ async def test_streaming_create_message_async( send_default_pii=send_default_pii, ) events = capture_events() - client.messages._post = mock.AsyncMock(return_value=returned_stream) + client.messages._post = AsyncMock(return_value=returned_stream) messages = [ { @@ -611,7 +620,7 @@ async def test_streaming_create_message_with_input_json_delta_async( send_default_pii=send_default_pii, ) events = capture_events() - client.messages._post = mock.AsyncMock(return_value=returned_stream) + client.messages._post = AsyncMock(return_value=returned_stream) messages = [ { @@ -683,7 +692,7 @@ async def test_exception_message_create_async(sentry_init, capture_events): events = capture_events() client = AsyncAnthropic(api_key="z") - client.messages._post = mock.AsyncMock( + client.messages._post = AsyncMock( side_effect=AnthropicError("API rate limit reached") ) with pytest.raises(AnthropicError): @@ -732,7 +741,7 @@ async def test_span_origin_async(sentry_init, capture_events): events = capture_events() client = AsyncAnthropic(api_key="z") - client.messages._post = mock.AsyncMock(return_value=EXAMPLE_MESSAGE) + client.messages._post = AsyncMock(return_value=EXAMPLE_MESSAGE) messages = [ { From 298a4d86678ca30258f14891ee0a8d06a069e7f8 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 15 Oct 2024 16:48:47 +0200 Subject: [PATCH 21/25] Fixed error capturing --- sentry_sdk/integrations/openai.py | 36 ++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index 9c40caa79d..8d7ce14e1b 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -141,12 +141,8 @@ def _new_chat_completion_common(f, *args, **kwargs): origin=OpenAIIntegration.origin, ) span.__enter__() - try: - res = yield f, args, kwargs - except Exception as e: - _capture_exception(e) - span.__exit__(None, None, None) - raise e from None + + res = yield f, args, kwargs with capture_internal_exceptions(): if should_send_default_pii() and integration.include_prompts: @@ -223,7 +219,12 @@ def _execute_sync(f, *args, **kwargs): return e.value try: - result = f(*args, **kwargs) + try: + result = f(*args, **kwargs) + except Exception as e: + _capture_exception(e) + raise e from None + return gen.send(result) except StopIteration as e: return e.value @@ -253,7 +254,12 @@ async def _execute_async(f, *args, **kwargs): return await e.value try: - result = await f(*args, **kwargs) + try: + result = await f(*args, **kwargs) + except Exception as e: + _capture_exception(e) + raise e from None + return gen.send(result) except StopIteration as e: return e.value @@ -334,7 +340,12 @@ def _execute_sync(f, *args, **kwargs): return e.value try: - result = f(*args, **kwargs) + try: + result = f(*args, **kwargs) + except Exception as e: + _capture_exception(e) + raise e from None + return gen.send(result) except StopIteration as e: return e.value @@ -363,7 +374,12 @@ async def _execute_async(f, *args, **kwargs): return await e.value try: - result = await f(*args, **kwargs) + try: + result = await f(*args, **kwargs) + except Exception as e: + _capture_exception(e) + raise e from None + return gen.send(result) except StopIteration as e: return e.value From b82cf994592d551203b7b6690e6b6ac3fe878b03 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 16 Oct 2024 10:09:57 +0200 Subject: [PATCH 22/25] Added async test cases for openai --- sentry_sdk/integrations/openai.py | 43 ++- tests/integrations/openai/test_openai.py | 338 ++++++++++++++++++++++- tox.ini | 1 + 3 files changed, 378 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index 8d7ce14e1b..663c326be7 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -15,7 +15,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Any, Iterable, List, Optional, Callable, Iterator + from typing import Any, Iterable, List, Optional, Callable, AsyncIterator, Iterator from sentry_sdk.tracing import Span try: @@ -165,7 +165,7 @@ def _new_chat_completion_common(f, *args, **kwargs): elif hasattr(res, "_iterator"): data_buf: list[list[str]] = [] # one for each choice - old_iterator = res._iterator # type: Iterator[ChatCompletionChunk] + old_iterator = res._iterator def new_iterator(): # type: () -> Iterator[ChatCompletionChunk] @@ -200,7 +200,44 @@ def new_iterator(): ) span.__exit__(None, None, None) - res._iterator = new_iterator() + async def new_iterator_async(): + # type: () -> AsyncIterator[ChatCompletionChunk] + with capture_internal_exceptions(): + async for x in old_iterator: + if hasattr(x, "choices"): + choice_index = 0 + for choice in x.choices: + if hasattr(choice, "delta") and hasattr( + choice.delta, "content" + ): + content = choice.delta.content + if len(data_buf) <= choice_index: + data_buf.append([]) + data_buf[choice_index].append(content or "") + choice_index += 1 + yield x + if len(data_buf) > 0: + all_responses = list( + map(lambda chunk: "".join(chunk), data_buf) + ) + if should_send_default_pii() and integration.include_prompts: + set_data_normalized( + span, SPANDATA.AI_RESPONSES, all_responses + ) + _calculate_chat_completion_usage( + messages, + res, + span, + all_responses, + integration.count_tokens, + ) + span.__exit__(None, None, None) + + if str(type(res._iterator)) == "": + res._iterator = new_iterator_async() + else: + res._iterator = new_iterator() + else: set_data_normalized(span, "unknown_response", True) span.__exit__(None, None, None) diff --git a/tests/integrations/openai/test_openai.py b/tests/integrations/openai/test_openai.py index b0ffc9e768..12e4e39cf5 100644 --- a/tests/integrations/openai/test_openai.py +++ b/tests/integrations/openai/test_openai.py @@ -1,5 +1,5 @@ import pytest -from openai import OpenAI, Stream, OpenAIError +from openai import AsyncOpenAI, OpenAI, AsyncStream, Stream, OpenAIError from openai.types import CompletionUsage, CreateEmbeddingResponse, Embedding from openai.types.chat import ChatCompletion, ChatCompletionMessage, ChatCompletionChunk from openai.types.chat.chat_completion import Choice @@ -11,6 +11,14 @@ from unittest import mock # python 3.3 and above +try: + from unittest.mock import AsyncMock +except ImportError: + + class AsyncMock(mock.MagicMock): + async def __call__(self, *args, **kwargs): + return super(AsyncMock, self).__call__(*args, **kwargs) + EXAMPLE_CHAT_COMPLETION = ChatCompletion( id="chat-id", @@ -34,6 +42,11 @@ ) +async def async_iterator(values): + for value in values: + yield value + + @pytest.mark.parametrize( "send_default_pii, include_prompts", [(True, True), (True, False), (False, True), (False, False)], @@ -78,6 +91,48 @@ def test_nonstreaming_chat_completion( assert span["measurements"]["ai_total_tokens_used"]["value"] == 30 +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [(True, True), (True, False), (False, True), (False, False)], +) +async def test_nonstreaming_chat_completion_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + sentry_init( + integrations=[OpenAIIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + client.chat.completions._post = AsyncMock(return_value=EXAMPLE_CHAT_COMPLETION) + + with start_transaction(name="openai tx"): + response = await client.chat.completions.create( + model="some-model", messages=[{"role": "system", "content": "hello"}] + ) + response = response.choices[0].message.content + + assert response == "the model response" + tx = events[0] + assert tx["type"] == "transaction" + span = tx["spans"][0] + assert span["op"] == "ai.chat_completions.create.openai" + + if send_default_pii and include_prompts: + assert "hello" in span["data"]["ai.input_messages"]["content"] + assert "the model response" in span["data"]["ai.responses"]["content"] + else: + assert "ai.input_messages" not in span["data"] + assert "ai.responses" not in span["data"] + + assert span["measurements"]["ai_completion_tokens_used"]["value"] == 10 + assert span["measurements"]["ai_prompt_tokens_used"]["value"] == 20 + assert span["measurements"]["ai_total_tokens_used"]["value"] == 30 + + def tiktoken_encoding_if_installed(): try: import tiktoken # type: ignore # noqa # pylint: disable=unused-import @@ -176,6 +231,102 @@ def test_streaming_chat_completion( pass # if tiktoken is not installed, we can't guarantee token usage will be calculated properly +# noinspection PyTypeChecker +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [(True, True), (True, False), (False, True), (False, False)], +) +async def test_streaming_chat_completion_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + sentry_init( + integrations=[ + OpenAIIntegration( + include_prompts=include_prompts, + tiktoken_encoding_name=tiktoken_encoding_if_installed(), + ) + ], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + returned_stream = AsyncStream(cast_to=None, response=None, client=client) + returned_stream._iterator = async_iterator( + [ + ChatCompletionChunk( + id="1", + choices=[ + DeltaChoice( + index=0, delta=ChoiceDelta(content="hel"), finish_reason=None + ) + ], + created=100000, + model="model-id", + object="chat.completion.chunk", + ), + ChatCompletionChunk( + id="1", + choices=[ + DeltaChoice( + index=1, delta=ChoiceDelta(content="lo "), finish_reason=None + ) + ], + created=100000, + model="model-id", + object="chat.completion.chunk", + ), + ChatCompletionChunk( + id="1", + choices=[ + DeltaChoice( + index=2, + delta=ChoiceDelta(content="world"), + finish_reason="stop", + ) + ], + created=100000, + model="model-id", + object="chat.completion.chunk", + ), + ] + ) + + client.chat.completions._post = AsyncMock(return_value=returned_stream) + with start_transaction(name="openai tx"): + response_stream = await client.chat.completions.create( + model="some-model", messages=[{"role": "system", "content": "hello"}] + ) + + response_string = "" + async for x in response_stream: + response_string += x.choices[0].delta.content + + assert response_string == "hello world" + tx = events[0] + assert tx["type"] == "transaction" + span = tx["spans"][0] + assert span["op"] == "ai.chat_completions.create.openai" + + if send_default_pii and include_prompts: + assert "hello" in span["data"]["ai.input_messages"]["content"] + assert "hello world" in span["data"]["ai.responses"] + else: + assert "ai.input_messages" not in span["data"] + assert "ai.responses" not in span["data"] + + try: + import tiktoken # type: ignore # noqa # pylint: disable=unused-import + + assert span["measurements"]["ai_completion_tokens_used"]["value"] == 2 + assert span["measurements"]["ai_prompt_tokens_used"]["value"] == 1 + assert span["measurements"]["ai_total_tokens_used"]["value"] == 3 + except ImportError: + pass # if tiktoken is not installed, we can't guarantee token usage will be calculated properly + + def test_bad_chat_completion(sentry_init, capture_events): sentry_init(integrations=[OpenAIIntegration()], traces_sample_rate=1.0) events = capture_events() @@ -193,6 +344,24 @@ def test_bad_chat_completion(sentry_init, capture_events): assert event["level"] == "error" +@pytest.mark.asyncio +async def test_bad_chat_completion_async(sentry_init, capture_events): + sentry_init(integrations=[OpenAIIntegration()], traces_sample_rate=1.0) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + client.chat.completions._post = AsyncMock( + side_effect=OpenAIError("API rate limit reached") + ) + with pytest.raises(OpenAIError): + await client.chat.completions.create( + model="some-model", messages=[{"role": "system", "content": "hello"}] + ) + + (event,) = events + assert event["level"] == "error" + + @pytest.mark.parametrize( "send_default_pii, include_prompts", [(True, True), (True, False), (False, True), (False, False)], @@ -240,6 +409,54 @@ def test_embeddings_create( assert span["measurements"]["ai_total_tokens_used"]["value"] == 30 +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [(True, True), (True, False), (False, True), (False, False)], +) +async def test_embeddings_create_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + sentry_init( + integrations=[OpenAIIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + + returned_embedding = CreateEmbeddingResponse( + data=[Embedding(object="embedding", index=0, embedding=[1.0, 2.0, 3.0])], + model="some-model", + object="list", + usage=EmbeddingTokenUsage( + prompt_tokens=20, + total_tokens=30, + ), + ) + + client.embeddings._post = AsyncMock(return_value=returned_embedding) + with start_transaction(name="openai tx"): + response = await client.embeddings.create( + input="hello", model="text-embedding-3-large" + ) + + assert len(response.data[0].embedding) == 3 + + tx = events[0] + assert tx["type"] == "transaction" + span = tx["spans"][0] + assert span["op"] == "ai.embeddings.create.openai" + if send_default_pii and include_prompts: + assert "hello" in span["data"]["ai.input_messages"] + else: + assert "ai.input_messages" not in span["data"] + + assert span["measurements"]["ai_prompt_tokens_used"]["value"] == 20 + assert span["measurements"]["ai_total_tokens_used"]["value"] == 30 + + def test_span_origin_nonstreaming_chat(sentry_init, capture_events): sentry_init( integrations=[OpenAIIntegration()], @@ -261,6 +478,28 @@ def test_span_origin_nonstreaming_chat(sentry_init, capture_events): assert event["spans"][0]["origin"] == "auto.ai.openai" +@pytest.mark.asyncio +async def test_span_origin_nonstreaming_chat_async(sentry_init, capture_events): + sentry_init( + integrations=[OpenAIIntegration()], + traces_sample_rate=1.0, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + client.chat.completions._post = AsyncMock(return_value=EXAMPLE_CHAT_COMPLETION) + + with start_transaction(name="openai tx"): + await client.chat.completions.create( + model="some-model", messages=[{"role": "system", "content": "hello"}] + ) + + (event,) = events + + assert event["contexts"]["trace"]["origin"] == "manual" + assert event["spans"][0]["origin"] == "auto.ai.openai" + + def test_span_origin_streaming_chat(sentry_init, capture_events): sentry_init( integrations=[OpenAIIntegration()], @@ -311,6 +550,7 @@ def test_span_origin_streaming_chat(sentry_init, capture_events): response_stream = client.chat.completions.create( model="some-model", messages=[{"role": "system", "content": "hello"}] ) + "".join(map(lambda x: x.choices[0].delta.content, response_stream)) (event,) = events @@ -319,6 +559,72 @@ def test_span_origin_streaming_chat(sentry_init, capture_events): assert event["spans"][0]["origin"] == "auto.ai.openai" +@pytest.mark.asyncio +async def test_span_origin_streaming_chat_async(sentry_init, capture_events): + sentry_init( + integrations=[OpenAIIntegration()], + traces_sample_rate=1.0, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + returned_stream = AsyncStream(cast_to=None, response=None, client=client) + returned_stream._iterator = async_iterator( + [ + ChatCompletionChunk( + id="1", + choices=[ + DeltaChoice( + index=0, delta=ChoiceDelta(content="hel"), finish_reason=None + ) + ], + created=100000, + model="model-id", + object="chat.completion.chunk", + ), + ChatCompletionChunk( + id="1", + choices=[ + DeltaChoice( + index=1, delta=ChoiceDelta(content="lo "), finish_reason=None + ) + ], + created=100000, + model="model-id", + object="chat.completion.chunk", + ), + ChatCompletionChunk( + id="1", + choices=[ + DeltaChoice( + index=2, + delta=ChoiceDelta(content="world"), + finish_reason="stop", + ) + ], + created=100000, + model="model-id", + object="chat.completion.chunk", + ), + ] + ) + + client.chat.completions._post = AsyncMock(return_value=returned_stream) + with start_transaction(name="openai tx"): + response_stream = await client.chat.completions.create( + model="some-model", messages=[{"role": "system", "content": "hello"}] + ) + async for _ in response_stream: + pass + + # "".join(map(lambda x: x.choices[0].delta.content, response_stream)) + + (event,) = events + + assert event["contexts"]["trace"]["origin"] == "manual" + assert event["spans"][0]["origin"] == "auto.ai.openai" + + def test_span_origin_embeddings(sentry_init, capture_events): sentry_init( integrations=[OpenAIIntegration()], @@ -346,3 +652,33 @@ def test_span_origin_embeddings(sentry_init, capture_events): assert event["contexts"]["trace"]["origin"] == "manual" assert event["spans"][0]["origin"] == "auto.ai.openai" + + +@pytest.mark.asyncio +async def test_span_origin_embeddings_async(sentry_init, capture_events): + sentry_init( + integrations=[OpenAIIntegration()], + traces_sample_rate=1.0, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + + returned_embedding = CreateEmbeddingResponse( + data=[Embedding(object="embedding", index=0, embedding=[1.0, 2.0, 3.0])], + model="some-model", + object="list", + usage=EmbeddingTokenUsage( + prompt_tokens=20, + total_tokens=30, + ), + ) + + client.embeddings._post = AsyncMock(return_value=returned_embedding) + with start_transaction(name="openai tx"): + await client.embeddings.create(input="hello", model="text-embedding-3-large") + + (event,) = events + + assert event["contexts"]["trace"]["origin"] == "manual" + assert event["spans"][0]["origin"] == "auto.ai.openai" diff --git a/tox.ini b/tox.ini index 1ad4fa23ec..e5cd0bd002 100644 --- a/tox.ini +++ b/tox.ini @@ -533,6 +533,7 @@ deps = loguru-latest: loguru # OpenAI + openai: pytest-asyncio openai-v1: openai~=1.0.0 openai-v1: tiktoken~=0.6.0 openai-latest: openai From 4a996af46c211bc3309137850a0fbf52e8a6d0ab Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 16 Oct 2024 14:14:29 +0200 Subject: [PATCH 23/25] More tests --- sentry_sdk/integrations/openai.py | 6 +-- tests/integrations/openai/test_openai.py | 55 ++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index 663c326be7..e6ac36f3cb 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -339,11 +339,7 @@ def _new_embeddings_create_common(f, *args, **kwargs): if "model" in kwargs: set_data_normalized(span, "ai.model_id", kwargs["model"]) - try: - response = yield f, args, kwargs - except Exception as e: - _capture_exception(e) - raise e from None + response = yield f, args, kwargs prompt_tokens = 0 total_tokens = 0 diff --git a/tests/integrations/openai/test_openai.py b/tests/integrations/openai/test_openai.py index 12e4e39cf5..73a9493b6b 100644 --- a/tests/integrations/openai/test_openai.py +++ b/tests/integrations/openai/test_openai.py @@ -457,6 +457,61 @@ async def test_embeddings_create_async( assert span["measurements"]["ai_total_tokens_used"]["value"] == 30 +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [(True, True), (True, False), (False, True), (False, False)], +) +def test_embeddings_create_raises_error( + sentry_init, capture_events, send_default_pii, include_prompts +): + sentry_init( + integrations=[OpenAIIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + client = OpenAI(api_key="z") + + client.embeddings._post = mock.Mock( + side_effect=OpenAIError("API rate limit reached") + ) + + with pytest.raises(OpenAIError): + client.embeddings.create(input="hello", model="text-embedding-3-large") + + (event,) = events + assert event["level"] == "error" + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [(True, True), (True, False), (False, True), (False, False)], +) +async def test_embeddings_create_raises_error_async( + sentry_init, capture_events, send_default_pii, include_prompts +): + sentry_init( + integrations=[OpenAIIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + client = AsyncOpenAI(api_key="z") + + client.embeddings._post = AsyncMock( + side_effect=OpenAIError("API rate limit reached") + ) + + with pytest.raises(OpenAIError): + await client.embeddings.create(input="hello", model="text-embedding-3-large") + + (event,) = events + assert event["level"] == "error" + + def test_span_origin_nonstreaming_chat(sentry_init, capture_events): sentry_init( integrations=[OpenAIIntegration()], From a85112da175145f7d9eace1fb815584c454d97d6 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 16 Oct 2024 15:18:24 +0200 Subject: [PATCH 24/25] More tests --- tests/integrations/openai/test_openai.py | 126 ++++++++++++++++++++++- 1 file changed, 125 insertions(+), 1 deletion(-) diff --git a/tests/integrations/openai/test_openai.py b/tests/integrations/openai/test_openai.py index 73a9493b6b..5504da91f9 100644 --- a/tests/integrations/openai/test_openai.py +++ b/tests/integrations/openai/test_openai.py @@ -7,7 +7,10 @@ from openai.types.create_embedding_response import Usage as EmbeddingTokenUsage from sentry_sdk import start_transaction -from sentry_sdk.integrations.openai import OpenAIIntegration +from sentry_sdk.integrations.openai import ( + OpenAIIntegration, + _calculate_chat_completion_usage, +) from unittest import mock # python 3.3 and above @@ -737,3 +740,124 @@ async def test_span_origin_embeddings_async(sentry_init, capture_events): assert event["contexts"]["trace"]["origin"] == "manual" assert event["spans"][0]["origin"] == "auto.ai.openai" + + +def test_calculate_chat_completion_usage_a(): + span = mock.MagicMock() + + def count_tokens(msg): + return len(str(msg)) + + response = mock.MagicMock() + response.usage = mock.MagicMock() + response.usage.completion_tokens = 10 + response.usage.prompt_tokens = 20 + response.usage.total_tokens = 30 + messages = [] + streaming_message_responses = [] + + with mock.patch( + "sentry_sdk.integrations.openai.record_token_usage" + ) as mock_record_token_usage: + _calculate_chat_completion_usage( + messages, response, span, streaming_message_responses, count_tokens + ) + mock_record_token_usage.assert_called_once_with(span, 20, 10, 30) + + +def test_calculate_chat_completion_usage_b(): + span = mock.MagicMock() + + def count_tokens(msg): + return len(str(msg)) + + response = mock.MagicMock() + response.usage = mock.MagicMock() + response.usage.completion_tokens = 10 + response.usage.total_tokens = 10 + messages = [ + {"content": "one"}, + {"content": "two"}, + {"content": "three"}, + ] + streaming_message_responses = [] + + with mock.patch( + "sentry_sdk.integrations.openai.record_token_usage" + ) as mock_record_token_usage: + _calculate_chat_completion_usage( + messages, response, span, streaming_message_responses, count_tokens + ) + mock_record_token_usage.assert_called_once_with(span, 11, 10, 10) + + +def test_calculate_chat_completion_usage_c(): + span = mock.MagicMock() + + def count_tokens(msg): + return len(str(msg)) + + response = mock.MagicMock() + response.usage = mock.MagicMock() + response.usage.prompt_tokens = 20 + response.usage.total_tokens = 20 + messages = [] + streaming_message_responses = [ + "one", + "two", + "three", + ] + + with mock.patch( + "sentry_sdk.integrations.openai.record_token_usage" + ) as mock_record_token_usage: + _calculate_chat_completion_usage( + messages, response, span, streaming_message_responses, count_tokens + ) + mock_record_token_usage.assert_called_once_with(span, 20, 11, 20) + + +def test_calculate_chat_completion_usage_d(): + span = mock.MagicMock() + + def count_tokens(msg): + return len(str(msg)) + + response = mock.MagicMock() + response.usage = mock.MagicMock() + response.usage.prompt_tokens = 20 + response.usage.total_tokens = 20 + response.choices = [ + mock.MagicMock(messge="one"), + mock.MagicMock(messge="two"), + mock.MagicMock(messge="three"), + ] + messages = [] + streaming_message_responses = [] + + with mock.patch( + "sentry_sdk.integrations.openai.record_token_usage" + ) as mock_record_token_usage: + _calculate_chat_completion_usage( + messages, response, span, streaming_message_responses, count_tokens + ) + mock_record_token_usage.assert_called_once_with(span, 20, None, 20) + + +def test_calculate_chat_completion_usage_e(): + span = mock.MagicMock() + + def count_tokens(msg): + return len(str(msg)) + + response = mock.MagicMock() + messages = [] + streaming_message_responses = None + + with mock.patch( + "sentry_sdk.integrations.openai.record_token_usage" + ) as mock_record_token_usage: + _calculate_chat_completion_usage( + messages, response, span, streaming_message_responses, count_tokens + ) + mock_record_token_usage.assert_called_once_with(span, None, None, None) From b8439f571f969d5ea5e84b24ca37b22a82886e37 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 16 Oct 2024 15:45:29 +0200 Subject: [PATCH 25/25] typo --- tests/integrations/openai/test_openai.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integrations/openai/test_openai.py b/tests/integrations/openai/test_openai.py index 5504da91f9..011192e49f 100644 --- a/tests/integrations/openai/test_openai.py +++ b/tests/integrations/openai/test_openai.py @@ -828,9 +828,9 @@ def count_tokens(msg): response.usage.prompt_tokens = 20 response.usage.total_tokens = 20 response.choices = [ - mock.MagicMock(messge="one"), - mock.MagicMock(messge="two"), - mock.MagicMock(messge="three"), + mock.MagicMock(message="one"), + mock.MagicMock(message="two"), + mock.MagicMock(message="three"), ] messages = [] streaming_message_responses = []