Skip to content

Commit

Permalink
Merge pull request #3 from NumexaHQ/async-and-observability-support
Browse files Browse the repository at this point in the history
Async and Observability Support (Without Proxy)
  • Loading branch information
dreamcode1994 authored Oct 8, 2023
2 parents c9ad3bb + 4830cc9 commit 15bdfcb
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 31 deletions.
36 changes: 18 additions & 18 deletions numexa/api_resources/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ def __init__(self, client: APIClient) -> None:
# self._put = client.put
# self._delete = client.delete

def _post(self, *args, **kwargs):
return self._client.post(*args, **kwargs)
async def _post(self, *args, **kwargs):
return await self._client.post(*args, **kwargs)


class Completions(APIResource):
@classmethod
@overload
def create(
async def create(
cls,
*,
prompt: Optional[str] = None,
Expand All @@ -56,7 +56,7 @@ def create(

@classmethod
@overload
def create(
async def create(
cls,
*,
prompt: Optional[str] = None,
Expand All @@ -72,7 +72,7 @@ def create(

@classmethod
@overload
def create(
async def create(
cls,
*,
prompt: Optional[str] = None,
Expand All @@ -87,7 +87,7 @@ def create(
...

@classmethod
def create(
async def create(
cls,
*,
prompt: Optional[str] = None,
Expand All @@ -111,7 +111,7 @@ def create(
**kwargs,
)
if config.mode == Modes.SINGLE.value:
return cls(_client)._post(
return await cls(_client)._post(
"/v1/complete",
body=config.llms,
mode=Modes.SINGLE.value,
Expand All @@ -121,7 +121,7 @@ def create(
stream=stream,
)
if config.mode == Modes.FALLBACK.value:
return cls(_client)._post(
return await cls(_client)._post(
"/v1/complete",
body=config.llms,
mode=Modes.FALLBACK,
Expand All @@ -131,7 +131,7 @@ def create(
stream=stream,
)
if config.mode == Modes.AB_TEST.value:
return cls(_client)._post(
return await cls(_client)._post(
"/v1/complete",
body=config.llms,
mode=Modes.AB_TEST,
Expand All @@ -146,7 +146,7 @@ def create(
class ChatCompletions(APIResource):
@classmethod
@overload
def create(
async def create(
cls,
*,
messages: Optional[List[Message]] = None,
Expand All @@ -162,7 +162,7 @@ def create(

@classmethod
@overload
def create(
async def create(
cls,
*,
messages: Optional[List[Message]] = None,
Expand All @@ -178,7 +178,7 @@ def create(

@classmethod
@overload
def create(
async def create(
cls,
*,
messages: Optional[List[Message]] = None,
Expand All @@ -193,7 +193,7 @@ def create(
...

@classmethod
def create(
async def create(
cls,
*,
messages: Optional[List[Message]] = None,
Expand Down Expand Up @@ -223,7 +223,7 @@ def create(
else:
url = "/chat/completions/direct"
if config.mode == Modes.SINGLE.value:
return cls(_client)._post(
return await cls(_client)._post(
url,
body=config.llms,
mode=Modes.SINGLE.value,
Expand All @@ -233,7 +233,7 @@ def create(
stream=stream,
)
if config.mode == Modes.FALLBACK.value:
return cls(_client)._post(
return await cls(_client)._post(
url,
body=config.llms,
mode=Modes.FALLBACK,
Expand All @@ -243,7 +243,7 @@ def create(
stream=stream,
)
if config.mode == Modes.AB_TEST.value:
return cls(_client)._post(
return await cls(_client)._post(
url,
body=config.llms,
mode=Modes.AB_TEST,
Expand All @@ -257,7 +257,7 @@ def create(

class Generations(APIResource):
@classmethod
def create(
async def create(
cls,
*,
prompt_id: str,
Expand All @@ -268,7 +268,7 @@ def create(
config = retrieve_config()
_client = APIClient(api_key=config.api_key, base_url=config.base_url)
body = {"variables": variables}
return cls(_client)._post(
return await cls(_client)._post(
f"/v1/prompts/{prompt_id}/generate",
body=body,
mode=None,
Expand Down
81 changes: 68 additions & 13 deletions numexa/api_resources/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import json
import os
import socket
from datetime import datetime
from types import TracebackType
from typing import (
Dict,
Expand All @@ -18,7 +20,7 @@
)
import httpx
import platform
from .global_constants import NUMEXA_HEADER_PREFIX, OPEN_API_KEY
from .global_constants import NUMEXA_HEADER_PREFIX, OPEN_API_KEY, NUMEXA_INGEST_LOGS
from .utils import (
remove_empty_values,
Body,
Expand Down Expand Up @@ -50,7 +52,7 @@ def __init__(self) -> None:


class APIClient:
_client: httpx.Client
_client: httpx.AsyncClient
_default_stream_cls: Union[type[Stream[Any]], None] = None

def __init__(
Expand All @@ -61,7 +63,7 @@ def __init__(
) -> None:
self.api_key = api_key or default_api_key()
self.base_url = base_url or default_base_url()
self._client = httpx.Client(
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers={"Accept": "application/json"},
)
Expand All @@ -83,7 +85,7 @@ def custom_auth(self) -> Optional[httpx.Auth]:
return None

@overload
def post(
async def post(
self,
path: str,
*,
Expand All @@ -97,7 +99,7 @@ def post(
...

@overload
def post(
async def post(
self,
path: str,
*,
Expand All @@ -111,7 +113,7 @@ def post(
...

@overload
def post(
async def post(
self,
path: str,
*,
Expand All @@ -124,7 +126,7 @@ def post(
) -> Union[ResponseT, StreamT]:
...

def post(
async def post(
self,
path: str,
*,
Expand Down Expand Up @@ -169,7 +171,7 @@ def post(
else:
raise NotImplementedError(f"This API path `{path}` is not implemented.")

res = self._request(
res = await self._request(
options=opts,
stream=stream,
cast_to=cast_to,
Expand Down Expand Up @@ -328,7 +330,7 @@ def _build_request(self, options: Options) -> List[httpx.Request]:
)
return [request]

def _build_request_direct(self, options: Options) -> List[httpx.Request]:
async def _build_request_direct(self, options: Options) -> List[httpx.Request]:
headers = self._build_headers(options)
new_payload = dict()
request_list = []
Expand Down Expand Up @@ -382,7 +384,7 @@ def _request(
) -> Union[ResponseT, StreamT]:
...

def _request(
async def _request(
self,
*,
options: Options,
Expand All @@ -393,13 +395,52 @@ def _request(
# proxy on
if not os.environ.get("NUMEXA_PROXY"):
# todo: change _build_request_direct to _build_request
request_list = self._build_request_direct(options)
request_list = await self._build_request_direct(options)
# proxy off
else:
request_list = self._build_request_direct(options)
request_list = await self._build_request_direct(options)
for request in request_list:
try:
res = self._client.send(request, auth=self.custom_auth, stream=stream)
initiated_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
res = await self._client.send(request, auth=self.custom_auth, stream=stream)
if os.environ.get("NUMEXA_PROXY"):
response_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Preparing Monger Request Body
monger_request_body = dict()
monger_request_body["request_time"] = initiated_timestamp
hostname = socket.gethostname()
monger_request_body["source_ip"] = socket.gethostbyname(hostname)
monger_request_body["request_method"] = "POST"
monger_request_body["request_url"] = str(request.url)
monger_request_body["request_body"] = json.loads(request.content.decode("utf-8"))
request.headers.update(
{"X-Numexa-Log-Type": "request", "X-Numexa-Api-Key": os.environ.get("NUMEXA_API_KEY")})
request.headers.update(
{"Content-Length": str(len(json.dumps(monger_request_body, default=str).encode("utf-8")))})
# Monger Request Ingestion Start
monger_request_option = Options(method=request.method, headers=request.headers,
json_body=monger_request_body,
url=NUMEXA_INGEST_LOGS)
monger_request = self._build_monger(options=monger_request_option)
await self._client.send(monger_request, auth=self.custom_auth, stream=stream)
# Monger Request Ingestion End

# Preparing Monger Response Body
monger_response_body = dict()
monger_response_body["initiated_timestamp"] = initiated_timestamp
monger_response_body["response_timestamp"] = response_timestamp
monger_response_body["response_status_code"] = res.status_code
monger_response_body["response_body"] = json.loads(res.content.decode("utf-8"))
res.headers.update(
{"X-Numexa-Log-Type": "response", "X-Numexa-Api-Key": os.environ.get("NUMEXA_API_KEY")})
res.headers.update(
{"Content-Length": str(len(json.dumps(monger_response_body, default=str).encode("utf-8")))})
# Monger Response Ingestion Start
monger_response_option = Options(method=request.method, headers=res.headers,
json_body=monger_response_body, url=NUMEXA_INGEST_LOGS)
monger_response = self._build_monger(options=monger_response_option)
await self._client.send(monger_response, auth=self.custom_auth, stream=stream)
# Monger Response Ingestion End
res.raise_for_status()
except httpx.HTTPStatusError as err: # 4xx and 5xx errors
# If the response is streamed then we need to explicitly read the response
Expand Down Expand Up @@ -431,6 +472,20 @@ def _extract_stream_chunk_type(self, stream_cls: Type) -> type:
f"Expected stream_cls to have been given a generic type argument, e.g. Stream[Foo] but received {stream_cls}",
)
return cast(type, args[0])

def _build_monger(self, options: Options) -> httpx.Request:
headers = options.headers
method = options.method
params = options.params
url = options.url
json_body = options.json_body
return self._client.build_request(
method=method,
url=url,
headers=headers,
params=params,
json=json_body,
timeout=options.timeout)

def _make_status_error_from_response(
self,
Expand Down
1 change: 1 addition & 0 deletions numexa/api_resources/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@
NUMEXA_PROXY_URL = "https://app.numexa.io/proxy/v1/openai"
NUMEXA_PROXY = "NUMEXA_PROXY"
OPEN_API_KEY = "OPEN_API_KEY"
NUMEXA_INGEST_LOGS = "https://app.numexa.io/proxy/v1/logs"

0 comments on commit 15bdfcb

Please sign in to comment.