Skip to content

Commit

Permalink
Llm tracker cleanup (#35)
Browse files Browse the repository at this point in the history
* cleanup

* remove extra import

* reorg

* adjust arguments

---------

Co-authored-by: Shawn Qiu <siyangqiu@gmail.com>
  • Loading branch information
areibman and siyangqiu authored Oct 6, 2023
1 parent 7544955 commit 9907140
Showing 1 changed file with 69 additions and 87 deletions.
156 changes: 69 additions & 87 deletions agentops/llm_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,112 +21,94 @@ def __init__(self, client):
self.client = client
self.event_stream = None

def parse_and_record_event(self, api, result, kwargs, init_timestamp):
if api == 'openai':
event = Event(
event_type=result.get('object'),
params=kwargs,
result='Success',
returns={"content":
result['choices'][0]['message']['content']},
action_type='llm',
model=result['model'],
prompt=kwargs['messages'],
init_timestamp=init_timestamp
)
self.client.record(event)

async def parse_and_record_async(self, api, result, kwargs, init_timestamp):
if api == 'openai':
event = Event(
event_type=result.get('object'),
params=kwargs,
result='Success',
returns={"content":
result['choices'][0]['message']['content']},
action_type='llm',
model=result['model'],
prompt=kwargs['messages'],
init_timestamp=init_timestamp
)
self.client.record(event)

def parse_and_record_chunks(self, api, result, kwargs, init_timestamp):
if api == 'openai':
model = result.get('model')
choices = result['choices']
token = choices[0]['delta'].get('content', '')
finish_reason = choices[0]['finish_reason']

if self.event_stream == None:
self.event_stream = Event(
event_type='openai stream',
def _handle_response_openai(self, response, kwargs, init_timestamp):
def handle_stream_chunk(chunk):
try:
model = chunk['model']
choices = chunk['choices']
token = choices[0]['delta'].get('content', '')
finish_reason = choices[0]['finish_reason']

if self.event_stream == None:
self.event_stream = Event(
event_type='openai stream',
params=kwargs,
result='Success',
returns={"finish_reason": None,
"content": token},
action_type='llm',
model=model,
prompt=kwargs["messages"],
init_timestamp=init_timestamp
)
else:
self.event_stream.returns['content'] += token

if finish_reason:
self.event_stream.returns['finish_reason'] = finish_reason
self.client.record(self.event_stream)
self.event_stream = None
except:
print(
f"Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")

# if the response is a generator, decorate the generator
if inspect.isasyncgen(response):
async def generator():
async for chunk in response:
handle_stream_chunk(chunk)

yield chunk
return generator()

if inspect.isgenerator(response):
def generator():
for chunk in response:
handle_stream_chunk(chunk)

yield chunk
return generator()

else:
try:
self.client.record(Event(
event_type=response['object'],
params=kwargs,
result='Success',
returns={"finish_reason": None, "content": token},
returns={"content":
response['choices'][0]['message']['content']},
action_type='llm',
model=model,
model=response['model'],
prompt=kwargs['messages'],
init_timestamp=init_timestamp
)
else:
self.event_stream.returns['content'] += token
))
except:
print(
f"Unable to parse response for LLM call {kwargs} - skipping upload to AgentOps")

# Finish reason is 'stop' or something else
if bool(finish_reason):
self.event_stream.returns['finish_reason'] = finish_reason
self.client.record(self.event_stream)
self.event_stream = None
return response

def _override_method(self, api, original_method):
"""
Generate a new method (either async or sync) that overrides the original
and records an event when called.
"""
def handle_response(result, kwargs, init_timestamp):
if api == "openai":
return self._handle_response_openai(
result, kwargs, init_timestamp)
return result

if inspect.iscoroutinefunction(original_method):
# Handle async generator for streams
@functools.wraps(original_method)
async def async_method(*args, **kwargs):
init_timestamp = get_ISO_time()
async_result = await original_method(*args, **kwargs)
# Async non-stream
try:
await self.parse_and_record_async(api, async_result,
kwargs, init_timestamp)
return async_result
# Async stream
except:
async def generator():
async for result in async_result:
self.parse_and_record_chunks(
api, result, kwargs, init_timestamp)
yield result

return generator()
response = await original_method(*args, **kwargs)
return handle_response(response, kwargs, init_timestamp)
return async_method

# Handle sync code
else:
@functools.wraps(original_method)
def sync_method(*args, **kwargs):
init_timestamp = get_ISO_time()
result = original_method(*args, **kwargs)
# Sync stream
try:
self.parse_and_record_event(
api, result, kwargs, init_timestamp)
return result
# Sync non-stream
except:
def generator():
for res in result:
self.parse_and_record_chunks(
api, res, kwargs, init_timestamp)
yield res

return generator()

response = original_method(*args, **kwargs)
return handle_response(response, kwargs, init_timestamp)
return sync_method

def override_api(self, api):
Expand Down

0 comments on commit 9907140

Please sign in to comment.