diff --git a/README.md b/README.md index 94974112..f631d2d9 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,42 @@ Refer to our [API documentation](http://docs.agentops.ai) for detailed instructi |🔜 Regression testing | 🔜 Multi-agent framework visualization | | | +## Callback handlers + +### Langchain +AgentOps works seemlessly with applications built using Langchain. To use the handler, install Langchain as an optional dependency: +```shell +pip install agentops[langchain] +``` + +To use the handler, import and set + +```python +import os +from langchain.chat_models import ChatOpenAI +from langchain.agents import initialize_agent, AgentType +from agentops import LangchainCallbackHandler + +AGENTOPS_API_KEY = os.environ['AGENTOPS_API_KEY'] +handler = LangchainCallbackHandler(api_key=AGENTOPS_API_KEY, tags=['Langchain Example']) + +llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY, + callbacks=[handler], + model='gpt-3.5-turbo') + +agent = initialize_agent(tools, + llm, + agent=AgentType.CHAT_ZERO_SHOT_REACT_DESCRIPTION, + verbose=True, + callbacks=[handler], # You must pass in a callback handler to record your agent + handle_parsing_errors=True) +``` + +Check out the [Langchain Examples Notebook](./examples/langchain_examples.ipynb) for more details including Async handlers. + +### LlamaIndex +(Coming Soon) + ### Why AgentOps? 🤔 diff --git a/agentops/helpers.py b/agentops/helpers.py index fe918b36..055556a6 100644 --- a/agentops/helpers.py +++ b/agentops/helpers.py @@ -1,6 +1,7 @@ from enum import Enum import time from datetime import datetime +from packaging.version import parse def get_ISO_time(): diff --git a/agentops/llm_tracker.py b/agentops/llm_tracker.py index 834c07d8..2cd177c0 100644 --- a/agentops/llm_tracker.py +++ b/agentops/llm_tracker.py @@ -1,23 +1,33 @@ import functools import inspect +import sys from importlib import import_module +from packaging.version import parse from .event import Event from .helpers import get_ISO_time class LlmTracker: SUPPORTED_APIS = { - 'openai': ( - "ChatCompletion.create", - "ChatCompletion.acreate", - ) + 'openai': { + '1.0.0': ( + "chat.completions.create", + ), + '0.0.0': + ( + "ChatCompletion.create", + "ChatCompletion.acreate", + ), + } } def __init__(self, client): self.client = client self.event_stream = None - def _handle_response_openai(self, response, kwargs, init_timestamp): + def _handle_response_v0_openai(self, response, kwargs, init_timestamp): + """Handle responses for OpenAI versions v1.0.0""" + from openai import Stream, AsyncStream + from openai.types.chat import ChatCompletionChunk + from openai.resources import AsyncCompletions + + def handle_stream_chunk(chunk: ChatCompletionChunk): + try: + model = chunk.model + choices = chunk.choices + token = choices[0].delta.content + finish_reason = choices[0].finish_reason + function_call = choices[0].delta.function_call + tool_calls = choices[0].delta.tool_calls + role = choices[0].delta.role + + if self.event_stream == None: + self.event_stream = Event( + event_type='openai chat completion stream', + params=kwargs, + result='Success', + returns={"finish_reason": None, + "content": token}, + action_type='llm', + model=model, + prompt=kwargs["messages"], + init_timestamp=init_timestamp + ) + else: + if token == None: + token = '' + self.event_stream.returns['content'] += token - return response + if finish_reason: + if not self.event_stream.returns: + self.event_stream.returns = {} + self.event_stream.returns['finish_reason'] = finish_reason + self.event_stream.returns['function_call'] = function_call + self.event_stream.returns['tool_calls'] = tool_calls + self.event_stream.returns['role'] = role + # Update end_timestamp + self.event_stream.end_timestamp = get_ISO_time() + self.client.record(self.event_stream) + self.event_stream = None + except Exception as e: + print( + f"Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps") - def _override_method(self, api, original_method): + # if the response is a generator, decorate the generator + if isinstance(response, Stream): + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + return generator() + + # For asynchronous AsyncStream + elif isinstance(response, AsyncStream): + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + return async_generator() + + # For async AsyncCompletion + elif isinstance(response, AsyncCompletions): + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + return async_generator() + + # v1.0.0+ responses are objects + try: + self.client.record(Event( + event_type=response.object, + params=kwargs, + result='Success', + returns={ + # TODO: Will need to make the completion the key for content, splat out the model dump + "content": response.choices[0].message.model_dump()}, + action_type='llm', + model=response.model, + prompt=kwargs['messages'], + init_timestamp=init_timestamp, + prompt_tokens=response.usage.prompt_tokens, + completion_tokens=response.usage.completion_tokens + )) + # Standard response + except Exception as e: + print( + f"Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps") + + return response + + def override_openai_v1_completion(self): + from openai.resources.chat import completions + + # Store the original method + original_create = completions.Completions.create + + # Define the patched function + def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + # Call the original function with its original arguments + result = original_create(*args, **kwargs) + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + # Override the original method with the patched one + completions.Completions.create = patched_function + + def override_openai_v1_async_completion(self): + from openai.resources.chat import completions + + # Store the original method + original_create = completions.AsyncCompletions.create + # Define the patched function + + async def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = await original_create(*args, **kwargs) + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + # Override the original method with the patched one + completions.AsyncCompletions.create = patched_function + + def _override_method(self, api, method_path, module): def handle_response(result, kwargs, init_timestamp): if api == "openai": - return self._handle_response_openai( - result, kwargs, init_timestamp) + return self._handle_response_v0_openai(result, kwargs, init_timestamp) return result - if inspect.iscoroutinefunction(original_method): - @functools.wraps(original_method) - async def async_method(*args, **kwargs): - init_timestamp = get_ISO_time() - response = await original_method(*args, **kwargs) - return handle_response(response, kwargs, init_timestamp) - return async_method + def wrap_method(original_method): + if inspect.iscoroutinefunction(original_method): + @functools.wraps(original_method) + async def async_method(*args, **kwargs): + init_timestamp = get_ISO_time() + response = await original_method(*args, **kwargs) + return handle_response(response, kwargs, init_timestamp) + return async_method + else: + @functools.wraps(original_method) + def sync_method(*args, **kwargs): + init_timestamp = get_ISO_time() + response = original_method(*args, **kwargs) + return handle_response(response, kwargs, init_timestamp) + return sync_method + + method_parts = method_path.split(".") + original_method = functools.reduce(getattr, method_parts, module) + new_method = wrap_method(original_method) + + if len(method_parts) == 1: + setattr(module, method_parts[0], new_method) else: - @functools.wraps(original_method) - def sync_method(*args, **kwargs): - init_timestamp = get_ISO_time() - response = original_method(*args, **kwargs) - return handle_response(response, kwargs, init_timestamp) - return sync_method + parent = functools.reduce(getattr, method_parts[:-1], module) + setattr(parent, method_parts[-1], new_method) def override_api(self, api): """ Overrides key methods of the specified API to record events. """ - if api not in self.SUPPORTED_APIS: - raise ValueError(f"Unsupported API: {api}") + if api in sys.modules: + if api not in self.SUPPORTED_APIS: + raise ValueError(f"Unsupported API: {api}") - module = import_module(api) + module = import_module(api) - for method_path in self.SUPPORTED_APIS[api]: - method_parts = method_path.split(".") - original_method = functools.reduce(getattr, method_parts, module) - new_method = self._override_method(api, original_method) + # If openai 1.0.0+ is specified, patch the completions methods + if api == 'openai': + if hasattr(module, '__version__'): + module_version = parse(module.__version__) + if module_version >= parse('1.0.0'): + self.override_openai_v1_completion() + self.override_openai_v1_async_completion() - if len(method_parts) == 1: - setattr(module, method_parts[0], new_method) - else: - parent = functools.reduce(getattr, method_parts[:-1], module) - setattr(parent, method_parts[-1], new_method) + # Patch all methods in every API + if hasattr(module, '__version__'): + module_version = parse(module.__version__) + for version in sorted(self.SUPPORTED_APIS[api], key=parse, reverse=True): + if module_version >= parse(version): + for method_path in self.SUPPORTED_APIS[api][version]: + self._override_method(api, method_path, module) + break diff --git a/pyproject.toml b/pyproject.toml index 9fb695dd..5397ea71 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "agentops" -version = "0.0.11" +version = "0.0.12" authors = [ { name="Alex Reibman", email="areibman@gmail.com" }, { name="Shawn Qiu", email="siyangqiu@gmail.com" } @@ -18,13 +18,17 @@ classifiers = [ "Operating System :: OS Independent", ] dependencies = [ - "requests", - "pydantic" + "requests==2.31.0", + "pydantic==2.4.2", + "packaging==23.1" ] [project.optional-dependencies] dev = [ - "pytest", - "requests_mock" + "pytest==7.4.0", + "requests_mock==1.11.0" +] +langchain = [ + "langchain==0.0.350" ] [project.urls] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 0d62c32f..00000000 --- a/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -pytest==7.4.0 -requests==2.31.0 -requests-mock==1.11.0 -pydantic==2.4.2 -langchain==0.0.350 \ No newline at end of file diff --git a/tests/async_call_agent.py b/tests/openai_handlers/test_handler_openai_v0.py similarity index 93% rename from tests/async_call_agent.py rename to tests/openai_handlers/test_handler_openai_v0.py index 95f95e6c..c1ed32fa 100644 --- a/tests/async_call_agent.py +++ b/tests/openai_handlers/test_handler_openai_v0.py @@ -4,8 +4,10 @@ import asyncio import os -ao_client = agentops.Client(api_key='', - tags=['mock tests']) +print('Running OpenAI = 1.0.0 + + +def test_override_api_version_ge_1(mock_openai): + mock_openai.__version__ = '1.0.0' # Version is exactly 1.0.0 + tracker = LlmTracker(client=MagicMock()) + + original_method = MagicMock() + mock_openai.chat = MagicMock(completions=MagicMock(create=original_method)) + + tracker.override_api('openai') + + # The original method should be replaced with a new method + assert mock_openai.chat.completions.create != original_method + assert callable(mock_openai.chat.completions.create) + +# Test that the correct methods are overridden for version < 1.0.0 + + +def test_override_api_version_lt_1(mock_openai): + mock_openai.__version__ = '0.9.9' # Version is less than 1.0.0 + tracker = LlmTracker(client=MagicMock()) + + original_method = MagicMock() + mock_openai.ChatCompletion = MagicMock(create=original_method) + + tracker.override_api('openai') + + # The original method should be replaced with a new method + assert mock_openai.ChatCompletion.create != original_method + assert callable(mock_openai.ChatCompletion.create) + +# Test that the override_api method handles missing __version__ attribute + + +def test_override_api_missing_version_attribute(mocker): + mock_openai = mocker.MagicMock() + mocker.patch.dict('sys.modules', {'openai': mock_openai}) + tracker = LlmTracker(client=MagicMock()) + + # This should not raise an error, and should use the methods for version < 1.0.0 + tracker.override_api('openai') + + # Now you need to assert that the correct methods for version < 1.0.0 are overridden + # Assuming 'ChatCompletion.create' is the method to be overridden for version < 1.0.0 + assert hasattr(mock_openai, 'ChatCompletion') + assert callable(mock_openai.ChatCompletion.create) diff --git a/tests/test_session.py b/tests/test_session.py index e4d7da05..4ebe0ac5 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -177,5 +177,25 @@ def foo(prompt=prompt): assert len(mock_req.request_history) == 1 request_json = mock_req.last_request.json() assert request_json['events'][0]['action_type'] == 'action' + assert request_json['events'][0]['prompt'] == None + assert request_json['events'][0]['returns'] == 'output' + assert request_json['events'][0]['result'] == 'Success' + + def test_llm_call_no_action_type(self, mock_req): + # Arrange + prompt = 'prompt' + + @self.client.record_action(event_name=self.event_type) + def llm_call(prompt=prompt): + return 'output' + + llm_call() + time.sleep(0.1) + + # Assert + assert len(mock_req.request_history) == 1 + request_json = mock_req.last_request.json() + assert request_json['events'][0]['action_type'] == 'action' + assert request_json['events'][0]['prompt'] == None assert request_json['events'][0]['returns'] == 'output' assert request_json['events'][0]['result'] == 'Success'