From 2f6a5f60f77432bb254dad2166816529d02a31f3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Dec 2024 22:01:18 +0000 Subject: [PATCH] feat: Enhance Voyage AI provider with async support and improved error handling - Add async embedding support - Improve error handling and logging - Add token counting from usage data - Enhance type hints - Maintain Python version warning Fixes #461 Co-Authored-By: Alex Reibman --- agentops/llms/providers/voyage.py | 38 ++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/agentops/llms/providers/voyage.py b/agentops/llms/providers/voyage.py index bebbb04a..e3c3f2f0 100644 --- a/agentops/llms/providers/voyage.py +++ b/agentops/llms/providers/voyage.py @@ -1,6 +1,7 @@ import sys import json -from typing import Optional, Callable +import pprint +from typing import Optional, Callable, Dict, Any from agentops.llms.providers.instrumented_provider import InstrumentedProvider from agentops.event import LLMEvent, ErrorEvent @@ -44,8 +45,8 @@ def _check_python_version(self) -> bool: return sys.version_info >= (3, 9) def handle_response( - self, response: dict, kwargs: dict, init_timestamp: str, session: Optional[Session] = None - ) -> dict: + self, response: Dict[str, Any], kwargs: Dict[str, Any], init_timestamp: str, session: Optional[Session] = None + ) -> Dict[str, Any]: """Handle responses for Voyage AI embeddings. Args: @@ -66,12 +67,23 @@ def handle_response( llm_event.model = kwargs.get("model") llm_event.prompt = kwargs.get("input") llm_event.agent_id = check_call_stack_for_agent_id() - llm_event.end_timestamp = get_ISO_time() + # Extract token counts if available + if usage := response.get("usage"): + llm_event.prompt_tokens = usage.get("prompt_tokens") + llm_event.completion_tokens = usage.get("completion_tokens") + + llm_event.end_timestamp = get_ISO_time() self._safe_record(session, llm_event) except Exception as e: self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e)) - logger.warning("Unable to parse response for Voyage call. Skipping upload to AgentOps\n") + kwargs_str = pprint.pformat(kwargs) + response_str = pprint.pformat(response) + logger.warning( + f"Unable to parse response for Voyage call. Skipping upload to AgentOps\n" + f"response:\n {response_str}\n" + f"kwargs:\n {kwargs_str}\n" + ) return response @@ -79,16 +91,24 @@ def override(self): """Override Voyage AI SDK methods with instrumented versions.""" import voyageai + # Store original methods self.original_embed = voyageai.Client.embed + self.original_embed_async = voyageai.Client.aembed - def patched_function(self, *args, **kwargs): + def patched_embed(self, *args, **kwargs): init_timestamp = get_ISO_time() session = kwargs.pop("session", None) - result = self.original_embed(*args, **kwargs) return self.handle_response(result, kwargs, init_timestamp, session=session) - voyageai.Client.embed = patched_function + async def patched_embed_async(self, *args, **kwargs): + init_timestamp = get_ISO_time() + session = kwargs.pop("session", None) + result = await self.original_embed_async(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp, session=session) + + voyageai.Client.embed = patched_embed + voyageai.Client.aembed = patched_embed_async def undo_override(self): """Restore original Voyage AI SDK methods.""" @@ -96,3 +116,5 @@ def undo_override(self): if self.original_embed: voyageai.Client.embed = self.original_embed + if self.original_embed_async: + voyageai.Client.aembed = self.original_embed_async