Skip to content

Commit

Permalink
Merge pull request #52 from AgentOps-AI/openai-update
Browse files Browse the repository at this point in the history
Openai update
  • Loading branch information
areibman authored Jan 3, 2024
2 parents de3ab0a + 02ae95a commit a6b8a15
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 58 deletions.
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,42 @@ Refer to our [API documentation](http://docs.agentops.ai) for detailed instructi
|🔜 Regression testing | 🔜 Multi-agent framework visualization | | |


## Callback handlers

### Langchain
AgentOps works seemlessly with applications built using Langchain. To use the handler, install Langchain as an optional dependency:
```shell
pip install agentops[langchain]
```

To use the handler, import and set

```python
import os
from langchain.chat_models import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from agentops import LangchainCallbackHandler

AGENTOPS_API_KEY = os.environ['AGENTOPS_API_KEY']
handler = LangchainCallbackHandler(api_key=AGENTOPS_API_KEY, tags=['Langchain Example'])

llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY,
callbacks=[handler],
model='gpt-3.5-turbo')

agent = initialize_agent(tools,
llm,
agent=AgentType.CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
callbacks=[handler], # You must pass in a callback handler to record your agent
handle_parsing_errors=True)
```

Check out the [Langchain Examples Notebook](./examples/langchain_examples.ipynb) for more details including Async handlers.

### LlamaIndex
(Coming Soon)


### Why AgentOps? 🤔

Expand Down
1 change: 1 addition & 0 deletions agentops/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
import time
from datetime import datetime
from packaging.version import parse


def get_ISO_time():
Expand Down
270 changes: 224 additions & 46 deletions agentops/llm_tracker.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
import functools
import inspect
import sys
from importlib import import_module
from packaging.version import parse
from .event import Event
from .helpers import get_ISO_time


class LlmTracker:
SUPPORTED_APIS = {
'openai': (
"ChatCompletion.create",
"ChatCompletion.acreate",
)
'openai': {
'1.0.0': (
"chat.completions.create",
),
'0.0.0':
(
"ChatCompletion.create",
"ChatCompletion.acreate",
),
}
}

def __init__(self, client):
self.client = client
self.event_stream = None

def _handle_response_openai(self, response, kwargs, init_timestamp):
def _handle_response_v0_openai(self, response, kwargs, init_timestamp):
"""Handle responses for OpenAI versions <v1.0.0"""

def handle_stream_chunk(chunk):
try:
model = chunk['model']
Expand All @@ -41,90 +51,258 @@ def handle_stream_chunk(chunk):
self.event_stream.returns['content'] += token

if finish_reason:
if not self.event_stream.returns:
self.event_stream.returns = {}
self.event_stream.returns['finish_reason'] = finish_reason
# Update end_timestamp
self.event_stream.end_timestamp = get_ISO_time()
self.client.record(self.event_stream)
self.event_stream = None
except:
except Exception as e:
print(
f"Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")

# if the response is a generator, decorate the generator
if inspect.isasyncgen(response):
async def generator():
async def async_generator():
async for chunk in response:
handle_stream_chunk(chunk)

yield chunk
return generator()
return async_generator()

if inspect.isgenerator(response):
elif inspect.isgenerator(response):
def generator():
for chunk in response:
handle_stream_chunk(chunk)

yield chunk
return generator()

else:
# v0.0.0 responses are dicts
try:
self.client.record(Event(
event_type=response['object'],
params=kwargs,
result='Success',
returns={"content":
response['choices'][0]['message']['content']},
action_type='llm',
model=response['model'],
prompt=kwargs['messages'],
init_timestamp=init_timestamp,
prompt_tokens=response.get('usage',
{}).get('prompt_tokens'),
completion_tokens=response.get('usage',
{}).get('completion_tokens')
))
except:
# v1.0.0+ responses are objects
try:
self.client.record(Event(
event_type=response['object'],
event_type=response.object,
params=kwargs,
result='Success',
returns={"content":
response['choices'][0]['message']['content']},
returns={
# TODO: Will need to make the completion the key for content, splat out the model dump
"content": response.choices[0].message.model_dump()},
action_type='llm',
model=response['model'],
model=response.model,
prompt=kwargs['messages'],
init_timestamp=init_timestamp
init_timestamp=init_timestamp,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens
))
except:
# Standard response
except Exception as e:
print(
f"Unable to parse response for LLM call {kwargs} - skipping upload to AgentOps")
f"Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")

return response

def _handle_response_v1_openai(self, response, kwargs, init_timestamp):
"""Handle responses for OpenAI versions >v1.0.0"""
from openai import Stream, AsyncStream
from openai.types.chat import ChatCompletionChunk
from openai.resources import AsyncCompletions

def handle_stream_chunk(chunk: ChatCompletionChunk):
try:
model = chunk.model
choices = chunk.choices
token = choices[0].delta.content
finish_reason = choices[0].finish_reason
function_call = choices[0].delta.function_call
tool_calls = choices[0].delta.tool_calls
role = choices[0].delta.role

if self.event_stream == None:
self.event_stream = Event(
event_type='openai chat completion stream',
params=kwargs,
result='Success',
returns={"finish_reason": None,
"content": token},
action_type='llm',
model=model,
prompt=kwargs["messages"],
init_timestamp=init_timestamp
)
else:
if token == None:
token = ''
self.event_stream.returns['content'] += token

return response
if finish_reason:
if not self.event_stream.returns:
self.event_stream.returns = {}
self.event_stream.returns['finish_reason'] = finish_reason
self.event_stream.returns['function_call'] = function_call
self.event_stream.returns['tool_calls'] = tool_calls
self.event_stream.returns['role'] = role
# Update end_timestamp
self.event_stream.end_timestamp = get_ISO_time()
self.client.record(self.event_stream)
self.event_stream = None
except Exception as e:
print(
f"Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")

def _override_method(self, api, original_method):
# if the response is a generator, decorate the generator
if isinstance(response, Stream):
def generator():
for chunk in response:
handle_stream_chunk(chunk)
yield chunk
return generator()

# For asynchronous AsyncStream
elif isinstance(response, AsyncStream):
async def async_generator():
async for chunk in response:
handle_stream_chunk(chunk)
yield chunk
return async_generator()

# For async AsyncCompletion
elif isinstance(response, AsyncCompletions):
async def async_generator():
async for chunk in response:
handle_stream_chunk(chunk)
yield chunk
return async_generator()

# v1.0.0+ responses are objects
try:
self.client.record(Event(
event_type=response.object,
params=kwargs,
result='Success',
returns={
# TODO: Will need to make the completion the key for content, splat out the model dump
"content": response.choices[0].message.model_dump()},
action_type='llm',
model=response.model,
prompt=kwargs['messages'],
init_timestamp=init_timestamp,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens
))
# Standard response
except Exception as e:
print(
f"Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")

return response

def override_openai_v1_completion(self):
from openai.resources.chat import completions

# Store the original method
original_create = completions.Completions.create

# Define the patched function
def patched_function(*args, **kwargs):
init_timestamp = get_ISO_time()
# Call the original function with its original arguments
result = original_create(*args, **kwargs)
return self._handle_response_v1_openai(result, kwargs, init_timestamp)

# Override the original method with the patched one
completions.Completions.create = patched_function

def override_openai_v1_async_completion(self):
from openai.resources.chat import completions

# Store the original method
original_create = completions.AsyncCompletions.create
# Define the patched function

async def patched_function(*args, **kwargs):
# Call the original function with its original arguments
init_timestamp = get_ISO_time()
result = await original_create(*args, **kwargs)
return self._handle_response_v1_openai(result, kwargs, init_timestamp)

# Override the original method with the patched one
completions.AsyncCompletions.create = patched_function

def _override_method(self, api, method_path, module):
def handle_response(result, kwargs, init_timestamp):
if api == "openai":
return self._handle_response_openai(
result, kwargs, init_timestamp)
return self._handle_response_v0_openai(result, kwargs, init_timestamp)
return result

if inspect.iscoroutinefunction(original_method):
@functools.wraps(original_method)
async def async_method(*args, **kwargs):
init_timestamp = get_ISO_time()
response = await original_method(*args, **kwargs)
return handle_response(response, kwargs, init_timestamp)
return async_method
def wrap_method(original_method):
if inspect.iscoroutinefunction(original_method):
@functools.wraps(original_method)
async def async_method(*args, **kwargs):
init_timestamp = get_ISO_time()
response = await original_method(*args, **kwargs)
return handle_response(response, kwargs, init_timestamp)
return async_method

else:
@functools.wraps(original_method)
def sync_method(*args, **kwargs):
init_timestamp = get_ISO_time()
response = original_method(*args, **kwargs)
return handle_response(response, kwargs, init_timestamp)
return sync_method

method_parts = method_path.split(".")
original_method = functools.reduce(getattr, method_parts, module)
new_method = wrap_method(original_method)

if len(method_parts) == 1:
setattr(module, method_parts[0], new_method)
else:
@functools.wraps(original_method)
def sync_method(*args, **kwargs):
init_timestamp = get_ISO_time()
response = original_method(*args, **kwargs)
return handle_response(response, kwargs, init_timestamp)
return sync_method
parent = functools.reduce(getattr, method_parts[:-1], module)
setattr(parent, method_parts[-1], new_method)

def override_api(self, api):
"""
Overrides key methods of the specified API to record events.
"""
if api not in self.SUPPORTED_APIS:
raise ValueError(f"Unsupported API: {api}")
if api in sys.modules:
if api not in self.SUPPORTED_APIS:
raise ValueError(f"Unsupported API: {api}")

module = import_module(api)
module = import_module(api)

for method_path in self.SUPPORTED_APIS[api]:
method_parts = method_path.split(".")
original_method = functools.reduce(getattr, method_parts, module)
new_method = self._override_method(api, original_method)
# If openai 1.0.0+ is specified, patch the completions methods
if api == 'openai':
if hasattr(module, '__version__'):
module_version = parse(module.__version__)
if module_version >= parse('1.0.0'):
self.override_openai_v1_completion()
self.override_openai_v1_async_completion()

if len(method_parts) == 1:
setattr(module, method_parts[0], new_method)
else:
parent = functools.reduce(getattr, method_parts[:-1], module)
setattr(parent, method_parts[-1], new_method)
# Patch all methods in every API
if hasattr(module, '__version__'):
module_version = parse(module.__version__)
for version in sorted(self.SUPPORTED_APIS[api], key=parse, reverse=True):
if module_version >= parse(version):
for method_path in self.SUPPORTED_APIS[api][version]:
self._override_method(api, method_path, module)
break
Loading

0 comments on commit a6b8a15

Please sign in to comment.