diff --git a/examples/logging/steaming.py b/examples/logging/steaming.py new file mode 100644 index 00000000..39ebba4f --- /dev/null +++ b/examples/logging/steaming.py @@ -0,0 +1,17 @@ +# from openai import OpenAI +from log10.load import OpenAI + +client = OpenAI() + +response = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Count to 200"}], + temperature=0, + stream=True, +) + +for chunk in response: + content = chunk.choices[0].delta.content + if content: + print(content, end="", flush=True) +print("") diff --git a/log10/load.py b/log10/load.py index 74eeaa81..018d79e9 100644 --- a/log10/load.py +++ b/log10/load.py @@ -28,6 +28,7 @@ token = os.environ.get("LOG10_TOKEN") org_id = os.environ.get("LOG10_ORG_ID") + # log10, bigquery target_service = os.environ.get("LOG10_DATA_STORE", "log10") @@ -267,6 +268,88 @@ def log_sync(completion_url, func, **kwargs): return completionID +class StreamingResponseWrapper: + """ + Wraps a streaming response object to log the final result and duration to log10. + + Openai V1 example: + Example: + >>> from log10.load import OpenAI + >>> client = OpenAI() + >>> response = client.chat.completions.create( + >>> model="gpt-3.5-turbo", + >>> messages=[{"role": "user", "content": "Count to 200"}], + >>> temperature=0, + >>> stream=True, + >>> ) + >>> for chunk in response: + >>> content = chunk.choices[0].delta.content + >>> if content: + >>> print(content, end="", flush=True) + >>> print("") + """ + + def __init__(self, completion_url, completionID, response, partial_log_row): + self.completionID = completionID + self.completion_url = completion_url + self.partial_log_row = partial_log_row + self.response = response + self.final_result = "" # Store the final result + self.start_time = time.perf_counter() + self.gpt_id = None + self.model = None + self.finish_reason = None + + def __iter__(self): + return self + + def __next__(self): + try: + chunk = next(self.response) + content = chunk.choices[0].delta.content + if content: + # Here you can intercept and modify content if needed + self.final_result += content # Save the content + # Yield the original or modified content + + self.model = chunk.model + self.gpt_id = chunk.id + else: + self.finish_reason = chunk.choices[0].finish_reason + + return chunk + except StopIteration as se: + # Log the final result + # Create fake response for openai format. + response = { + "id": self.gpt_id, + "object": "completion", + "model": self.model, + "choices": [ + { + "index": 0, + "finish_reason": self.finish_reason, + "message": { + "role": "assistant", + "content": self.final_result, + }, + } + ], + } + self.partial_log_row["response"] = json.dumps(response) + self.partial_log_row["duration"] = int((time.perf_counter() - self.start_time) * 1000) + + try: + res = post_request(self.completion_url + "/" + self.completionID, self.partial_log_row) + if res.status_code != 200: + logger.error(f"LOG10: failed to insert in log10: {self.partial_log_row} with error {res.text}") + except Exception as e: + traceback.print_tb(e.__traceback__) + logging.warn(f"LOG10: failed to log: {e}. Skipping") + + raise se + + def intercepting_decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -359,6 +442,25 @@ def wrapper(*args, **kwargs): response = Anthropic.prepare_response(kwargs["prompt"], output, "text") kind = "completion" + elif type(output).__name__ == "Stream": + kind = "chat" # Should be "stream", but we don't have that kind yet. + return StreamingResponseWrapper( + completion_url=completion_url, + completionID=completionID, + response=response, + partial_log_row={ + "response": response, + "status": "finished", + "stacktrace": json.dumps(stacktrace), + "kind": kind, + "orig_module": func.__module__, + "orig_qualname": func.__qualname__, + "request": json.dumps(kwargs), + "session_id": sessionID, + "tags": global_tags, + }, + ) + else: response = output kind = "chat" if output.object == "chat.completion" else "completion"