Skip to content

Commit

Permalink
use httpx async client
Browse files Browse the repository at this point in the history
  • Loading branch information
wenzhe-log10 committed Apr 8, 2024
1 parent d751355 commit 1cbef41
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions log10/_httpx_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,37 @@ def _try_post_request(url: str, payload: dict = {}) -> httpx.Response:
except Exception as err:
logger.error(f"Failed to insert in log10: {payload} with error {err}")

async def _try_post_request_async(url: str, payload: dict = {}) -> httpx.Response:
headers = {
"x-log10-token": _log10_config.token,
"x-log10-organization-id": _log10_config.org_id,
"Content-Type": "application/json",
}
payload["organization_id"] = _log10_config.org_id
try:
async with httpx.AsyncClient() as client:
res= await client.post(url, headers=headers, json=payload)
res.raise_for_status()
return res
except httpx.HTTPStatusError as http_err:
if "401" in str(http_err):
logger.error(
"Failed authorization. Please verify that LOG10_TOKEN and LOG10_ORG_ID are set correctly and try again."
+ "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details"
)
else:
logger.error(f"Failed with error: {http_err}")
except Exception as err:
logger.error(f"Failed to insert in log10: {payload} with error {err}")


async def get_completion_id(request: Request):
if "v1/chat/completions" not in str(request.url):
logger.warning("Currently logging is only available for v1/chat/completions.")
return

completion_url = "/api/completions"
res = _try_post_request(url=f"{base_url}{completion_url}")
res = await _try_post_request_async(url=f"{base_url}{completion_url}")
try:
completion_id = res.json().get("completionID")
except Exception as e:
Expand Down Expand Up @@ -125,7 +148,7 @@ async def log_request(request: Request):
}
if get_log10_session_tags():
log_row["tags"] = get_log10_session_tags()
_try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)
await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)


class _LogResponse(Response):
Expand Down Expand Up @@ -205,7 +228,7 @@ async def aiter_bytes(self, *args, **kwargs):
}
if get_log10_session_tags():
log_row["tags"] = get_log10_session_tags()
_try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)
await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)


class _LogTransport(httpx.AsyncBaseTransport):
Expand Down Expand Up @@ -246,7 +269,7 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
}
if get_log10_session_tags():
log_row["tags"] = get_log10_session_tags()
_try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)
await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)
return response
elif response.headers.get("content-type") == "text/event-stream":
return _LogResponse(
Expand Down

0 comments on commit 1cbef41

Please sign in to comment.