From 98c5586b3e707105396c4a8b6efaf0a9523a4fa2 Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Mon, 8 Apr 2024 11:05:25 -0700 Subject: [PATCH] use httpx async client --- log10/_httpx_utils.py | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 6316091a..20e04d64 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -83,13 +83,37 @@ def _try_post_request(url: str, payload: dict = {}) -> httpx.Response: logger.error(f"Failed to insert in log10: {payload} with error {err}") +async def _try_post_request_async(url: str, payload: dict = {}) -> httpx.Response: + headers = { + "x-log10-token": _log10_config.token, + "x-log10-organization-id": _log10_config.org_id, + "Content-Type": "application/json", + } + payload["organization_id"] = _log10_config.org_id + try: + async with httpx.AsyncClient() as client: + res = await client.post(url, headers=headers, json=payload) + res.raise_for_status() + return res + except httpx.HTTPStatusError as http_err: + if "401" in str(http_err): + logger.error( + "Failed authorization. Please verify that LOG10_TOKEN and LOG10_ORG_ID are set correctly and try again." + + "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details" + ) + else: + logger.error(f"Failed with error: {http_err}") + except Exception as err: + logger.error(f"Failed to insert in log10: {payload} with error {err}") + + async def get_completion_id(request: Request): if "v1/chat/completions" not in str(request.url): logger.warning("Currently logging is only available for v1/chat/completions.") return completion_url = "/api/completions" - res = _try_post_request(url=f"{base_url}{completion_url}") + res = await _try_post_request_async(url=f"{base_url}{completion_url}") try: completion_id = res.json().get("completionID") except Exception as e: @@ -125,7 +149,7 @@ async def log_request(request: Request): } if get_log10_session_tags(): log_row["tags"] = get_log10_session_tags() - _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) class _LogResponse(Response): @@ -205,7 +229,7 @@ async def aiter_bytes(self, *args, **kwargs): } if get_log10_session_tags(): log_row["tags"] = get_log10_session_tags() - _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) class _LogTransport(httpx.AsyncBaseTransport): @@ -246,7 +270,7 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: } if get_log10_session_tags(): log_row["tags"] = get_log10_session_tags() - _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) return response elif response.headers.get("content-type") == "text/event-stream": return _LogResponse(