Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming support #96

Merged
merged 5 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/logging/steaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# from openai import OpenAI
from log10.load import OpenAI

client = OpenAI()

response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Count to 200"}],
temperature=0,
stream=True,
)

for chunk in response:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
print("")
102 changes: 102 additions & 0 deletions log10/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
token = os.environ.get("LOG10_TOKEN")
org_id = os.environ.get("LOG10_ORG_ID")


# log10, bigquery
target_service = os.environ.get("LOG10_DATA_STORE", "log10")

Expand Down Expand Up @@ -267,6 +268,88 @@ def log_sync(completion_url, func, **kwargs):
return completionID


class StreamingResponseWrapper:
"""
Wraps a streaming response object to log the final result and duration to log10.

Openai V1 example:
Example:
>>> from log10.load import OpenAI
>>> client = OpenAI()
>>> response = client.chat.completions.create(
>>> model="gpt-3.5-turbo",
>>> messages=[{"role": "user", "content": "Count to 200"}],
>>> temperature=0,
>>> stream=True,
>>> )
>>> for chunk in response:
>>> content = chunk.choices[0].delta.content
>>> if content:
>>> print(content, end="", flush=True)
>>> print("")
"""

def __init__(self, completion_url, completionID, response, partial_log_row):
self.completionID = completionID
self.completion_url = completion_url
self.partial_log_row = partial_log_row
self.response = response
self.final_result = "" # Store the final result
self.start_time = time.perf_counter()
self.gpt_id = None
self.model = None
self.finish_reason = None

def __iter__(self):
return self

def __next__(self):
try:
chunk = next(self.response)
content = chunk.choices[0].delta.content
if content:
# Here you can intercept and modify content if needed
self.final_result += content # Save the content
# Yield the original or modified content

self.model = chunk.model
self.gpt_id = chunk.id
else:
self.finish_reason = chunk.choices[0].finish_reason

return chunk
except StopIteration as se:
# Log the final result
# Create fake response for openai format.
response = {
"id": self.gpt_id,
"object": "completion",
"model": self.model,
"choices": [
{
"index": 0,
"finish_reason": self.finish_reason,
"message": {
"role": "assistant",
"content": self.final_result,
},
}
],
}
self.partial_log_row["response"] = json.dumps(response)
self.partial_log_row["duration"] = int((time.perf_counter() - self.start_time) * 1000)

try:
res = post_request(self.completion_url + "/" + self.completionID, self.partial_log_row)
if res.status_code != 200:
logger.error(f"LOG10: failed to insert in log10: {self.partial_log_row} with error {res.text}")
except Exception as e:
traceback.print_tb(e.__traceback__)
logging.warn(f"LOG10: failed to log: {e}. Skipping")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logging.warn(f"LOG10: failed to log: {e}. Skipping")
logger.error(f"LOG10: failed to log: {e}. Skipping")

be consistent.
Also thinking we could consider to raise the logging level even higher to logger.critical for LOG10 failures. https://docs.python.org/3/library/logging.html#logging.Logger.critical
in a follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we are using logging.warn other places?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, seems there's a mix of logger.error and logger.warn in the load.py. could do a clean up in a follow up PR.


raise se
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to verify, this is the StopIterator so we want to raise it to tell user the stream has finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's right



def intercepting_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
Expand Down Expand Up @@ -359,6 +442,25 @@ def wrapper(*args, **kwargs):

response = Anthropic.prepare_response(kwargs["prompt"], output, "text")
kind = "completion"
elif type(output).__name__ == "Stream":
kind = "chat" # Should be "stream", but we don't have that kind yet.
return StreamingResponseWrapper(
completion_url=completion_url,
completionID=completionID,
response=response,
partial_log_row={
"response": response,
"status": "finished",
"stacktrace": json.dumps(stacktrace),
"kind": kind,
"orig_module": func.__module__,
"orig_qualname": func.__qualname__,
"request": json.dumps(kwargs),
"session_id": sessionID,
"tags": global_tags,
},
)

else:
response = output
kind = "chat" if output.object == "chat.completion" else "completion"
Expand Down