Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for async calls in Anthropic and OpenAI integration #3497

Merged
merged 34 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8a1b40a
Add support for async calls in Anthropic, OpenAI integrations
vetyy Sep 4, 2024
3979682
Fix types
vetyy Sep 4, 2024
99387d5
Add exec_sync and exec_async to correctly handle function execution i…
vetyy Sep 4, 2024
cd71e10
Correctly yield f, args, kwargs
vetyy Sep 4, 2024
a8a21f8
Handle generators correctly
vetyy Sep 4, 2024
f722562
Merge branch 'master' into async-llm
antonpirker Sep 6, 2024
9b00313
Merge branch 'master' into async-llm
antonpirker Oct 14, 2024
ea63aaa
Removed the ensure_integration_enabled
antonpirker Oct 14, 2024
ed61be4
Make mypy happy
antonpirker Oct 14, 2024
d3b48bf
Fix from other contributor pr
antonpirker Oct 14, 2024
f837fab
whoops
antonpirker Oct 14, 2024
708adfd
Make it work with async streaming responses
antonpirker Oct 15, 2024
523bc0f
Renamed for better readability
antonpirker Oct 15, 2024
a175ba1
Renamed for consistency
antonpirker Oct 15, 2024
6c1c97d
Moved duplicated code into functions
antonpirker Oct 15, 2024
a18e6a5
Better comments
antonpirker Oct 15, 2024
e263c73
typing
antonpirker Oct 15, 2024
dd21096
Fixed counting tokens
antonpirker Oct 15, 2024
35b372e
linting
antonpirker Oct 15, 2024
5417776
Fixed capturing of exceptions
antonpirker Oct 15, 2024
54c687b
Merge branch 'master' into async-llm
antonpirker Oct 15, 2024
d8dd55a
Added async test cases
antonpirker Oct 15, 2024
0ec5fe8
Mack async work in python 3.7
antonpirker Oct 15, 2024
298a4d8
Fixed error capturing
antonpirker Oct 15, 2024
fa46f61
Merge branch 'master' into async-llm
antonpirker Oct 15, 2024
fb1b22c
Merge branch 'master' into async-llm
antonpirker Oct 16, 2024
b82cf99
Added async test cases for openai
antonpirker Oct 16, 2024
bebbe8c
Merge branch 'async-llm' of github.com:vetyy/sentry-python into pr/ve…
antonpirker Oct 16, 2024
c9d4cf1
Merge branch 'master' into async-llm
antonpirker Oct 16, 2024
4a996af
More tests
antonpirker Oct 16, 2024
6cc5cab
Merge branch 'async-llm' of github.com:vetyy/sentry-python into pr/ve…
antonpirker Oct 16, 2024
a85112d
More tests
antonpirker Oct 16, 2024
b8439f5
typo
antonpirker Oct 16, 2024
67e0cd5
Merge branch 'master' into async-llm
antonpirker Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 136 additions & 91 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
from typing import TYPE_CHECKING

try:
from anthropic.resources import Messages
from anthropic.resources import Messages, AsyncMessages

if TYPE_CHECKING:
from anthropic.types import MessageStreamEvent
except ImportError:
raise DidNotEnable("Anthropic not installed")


if TYPE_CHECKING:
from typing import Any, Iterator
from sentry_sdk.tracing import Span
Expand All @@ -48,6 +47,7 @@ def setup_once():
raise DidNotEnable("anthropic 0.16 or newer required.")

Messages.create = _wrap_message_create(Messages.create)
AsyncMessages.create = _wrap_async_message_create(AsyncMessages.create)


def _capture_exception(exc):
Expand Down Expand Up @@ -75,105 +75,150 @@ def _calculate_token_usage(result, span):
record_token_usage(span, input_tokens, output_tokens, total_tokens)


def _sentry_patched_create_common(f, *args, **kwargs):
# type: (Any, *Any, **Any) -> Any
if "messages" not in kwargs:
return f(*args, **kwargs)

try:
iter(kwargs["messages"])
except TypeError:
return f(*args, **kwargs)

messages = list(kwargs["messages"])
model = kwargs.get("model")

span = sentry_sdk.start_span(
op=OP.ANTHROPIC_MESSAGES_CREATE,
description="Anthropic messages create",
origin=AnthropicIntegration.origin,
)
span.__enter__()

try:
result = yield f, args, kwargs
except Exception as exc:
_capture_exception(exc)
span.__exit__(None, None, None)
raise exc from None

integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)

with capture_internal_exceptions():
span.set_data(SPANDATA.AI_MODEL_ID, model)
span.set_data(SPANDATA.AI_STREAMING, False)
if should_send_default_pii() and integration.include_prompts:
span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages)
if hasattr(result, "content"):
if should_send_default_pii() and integration.include_prompts:
span.set_data(
SPANDATA.AI_RESPONSES,
list(
map(
lambda message: {
"type": message.type,
"text": message.text,
},
result.content,
)
),
)
_calculate_token_usage(result, span)
span.__exit__(None, None, None)
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator():
# type: () -> Iterator[MessageStreamEvent]
input_tokens = 0
output_tokens = 0
content_blocks = []
with capture_internal_exceptions():
for event in old_iterator:
if hasattr(event, "type"):
if event.type == "message_start":
usage = event.message.usage
input_tokens += usage.input_tokens
output_tokens += usage.output_tokens
elif event.type == "content_block_start":
pass
elif event.type == "content_block_delta":
content_blocks.append(event.delta.text)
antonpirker marked this conversation as resolved.
Show resolved Hide resolved
elif event.type == "content_block_stop":
pass
elif event.type == "message_delta":
output_tokens += event.usage.output_tokens
elif event.type == "message_stop":
continue
yield event

if should_send_default_pii() and integration.include_prompts:
complete_message = "".join(content_blocks)
span.set_data(
SPANDATA.AI_RESPONSES,
[{"type": "text", "text": complete_message}],
)
total_tokens = input_tokens + output_tokens
record_token_usage(span, input_tokens, output_tokens, total_tokens)
span.set_data(SPANDATA.AI_STREAMING, True)
span.__exit__(None, None, None)

result._iterator = new_iterator()
else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)

return result


def _wrap_message_create(f):
# type: (Any) -> Any
def _execute_sync(f, *args, **kwargs):
# type: (Any, *Any, **Any) -> Any
gen = _sentry_patched_create_common(f, *args, **kwargs)

try:
f, args, kwargs = next(gen)
except StopIteration as e:
return e.value

try:
result = f(*args, **kwargs)
return gen.send(result)
except StopIteration as e:
return e.value

@wraps(f)
@ensure_integration_enabled(AnthropicIntegration, f)
def _sentry_patched_create(*args, **kwargs):
def _sentry_patched_create_sync(*args, **kwargs):
# type: (*Any, **Any) -> Any
if "messages" not in kwargs:
return f(*args, **kwargs)

try:
iter(kwargs["messages"])
except TypeError:
return f(*args, **kwargs)
return _execute_sync(f, *args, **kwargs)

messages = list(kwargs["messages"])
model = kwargs.get("model")
return _sentry_patched_create_sync

span = sentry_sdk.start_span(
op=OP.ANTHROPIC_MESSAGES_CREATE,
description="Anthropic messages create",
origin=AnthropicIntegration.origin,
)
span.__enter__()

try:
result = f(*args, **kwargs)
except Exception as exc:
_capture_exception(exc)
span.__exit__(None, None, None)
raise exc from None
def _wrap_async_message_create(f):
# type: (Any) -> Any

integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
async def _execute_async(f, *args, **kwargs):
# type: (Any, *Any, **Any) -> Any
gen = _sentry_patched_create_common(f, *args, **kwargs)

with capture_internal_exceptions():
span.set_data(SPANDATA.AI_MODEL_ID, model)
span.set_data(SPANDATA.AI_STREAMING, False)
if should_send_default_pii() and integration.include_prompts:
span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages)
if hasattr(result, "content"):
if should_send_default_pii() and integration.include_prompts:
span.set_data(
SPANDATA.AI_RESPONSES,
list(
map(
lambda message: {
"type": message.type,
"text": message.text,
},
result.content,
)
),
)
_calculate_token_usage(result, span)
span.__exit__(None, None, None)
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator():
# type: () -> Iterator[MessageStreamEvent]
input_tokens = 0
output_tokens = 0
content_blocks = []
with capture_internal_exceptions():
for event in old_iterator:
if hasattr(event, "type"):
if event.type == "message_start":
usage = event.message.usage
input_tokens += usage.input_tokens
output_tokens += usage.output_tokens
elif event.type == "content_block_start":
pass
elif event.type == "content_block_delta":
content_blocks.append(event.delta.text)
elif event.type == "content_block_stop":
pass
elif event.type == "message_delta":
output_tokens += event.usage.output_tokens
elif event.type == "message_stop":
continue
yield event

if should_send_default_pii() and integration.include_prompts:
complete_message = "".join(content_blocks)
span.set_data(
SPANDATA.AI_RESPONSES,
[{"type": "text", "text": complete_message}],
)
total_tokens = input_tokens + output_tokens
record_token_usage(
span, input_tokens, output_tokens, total_tokens
)
span.set_data(SPANDATA.AI_STREAMING, True)
span.__exit__(None, None, None)
try:
f, args, kwargs = next(gen)
except StopIteration as e:
return await e.value

result._iterator = new_iterator()
else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
try:
result = await f(*args, **kwargs)
return gen.send(result)
except StopIteration as e:
return e.value

return result
@wraps(f)
@ensure_integration_enabled(AnthropicIntegration, f)
async def _sentry_patched_create_async(*args, **kwargs):
# type: (*Any, **Any) -> Any
return await _execute_async(f, *args, **kwargs)

return _sentry_patched_create
return _sentry_patched_create_async
Loading
Loading