Skip to content

Commit

Permalink
feat: show image builder logs (#4796)
Browse files Browse the repository at this point in the history
  • Loading branch information
yetone committed Jun 14, 2024
1 parent 32e7057 commit ddb653d
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 55 deletions.
39 changes: 35 additions & 4 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ dependencies = [
# for manipulating pyproject.toml
"tomli>=1.1.0; python_version < \"3.11\"",
"tomli-w",
"httpx-ws>=0.6.0",
]
dynamic = ["version"]
[project.urls]
Expand Down
2 changes: 1 addition & 1 deletion src/bentoml/_internal/cloud/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def write(self, data: bytes) -> t.Any: # type: ignore # python buffer types ar
return res


class Spinner(ABC):
class Spinner:
def __init__(self):
self.log_progress = Progress(TextColumn("{task.description}"))

Expand Down
88 changes: 88 additions & 0 deletions src/bentoml/_internal/cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
import contextlib
import logging
import typing as t
import uuid
from urllib.parse import urlencode
from urllib.parse import urljoin
from urllib.parse import urlparse

import httpx
from httpx_ws import WebSocketNetworkError
from httpx_ws import WebSocketSession
from httpx_ws import connect_ws

from ...exceptions import CloudRESTApiClientError
from ...exceptions import NotFound
Expand Down Expand Up @@ -38,6 +44,9 @@
from .schemas.schemasv2 import CreateDeploymentSchema as CreateDeploymentSchemaV2
from .schemas.schemasv2 import DeploymentFullSchema as DeploymentFullSchemaV2
from .schemas.schemasv2 import DeploymentListSchema as DeploymentListSchemaV2
from .schemas.schemasv2 import KubePodSchema
from .schemas.schemasv2 import KubePodWSResponseSchema
from .schemas.schemasv2 import LogWSResponseSchema
from .schemas.schemasv2 import UpdateDeploymentSchema as UpdateDeploymentSchemaV2
from .schemas.utils import schema_from_json
from .schemas.utils import schema_from_object
Expand Down Expand Up @@ -676,6 +685,85 @@ def list_instance_types(
self._check_resp(resp)
return schema_from_json(resp.text, list[ResourceInstanceSchema])

def get_deployment_image_builder_pod(
self, name: str, cluster: str | None = None
) -> KubePodSchema | None:
pods = self.list_deployment_pods(name, cluster=cluster)
if not pods:
raise NotFound(f"Deployment {name} pods is not found")
for pod in pods:
if pod.labels.get("yatai.ai/is-bento-image-builder") == "true":
return pod
return None

def list_deployment_pods(
self, name: str, cluster: str | None = None
) -> list[KubePodSchema]:
deployment = self.get_deployment(name, cluster=cluster)
if not deployment.latest_revision:
raise NotFound(f"Deployment {name} latest revision is not found")
if not deployment.latest_revision.targets:
raise NotFound(f"Deployment {name} latest revision targets is not found")
target = deployment.latest_revision.targets[0]
if not target:
raise NotFound(f"Deployment {name} latest revision target is not found")
if not target.bento:
raise NotFound(
f"Deployment {name} latest revision target bento is not found"
)
url_ = urlparse(self.endpoint)
scheme = "wss"
if url_.scheme == "http":
scheme = "ws"
endpoint = f"{scheme}://{url_.netloc}"
with connect_ws(
url=f"{endpoint}/ws/v1/clusters/{deployment.cluster.name}/pods?{urlencode(dict(organization_name=deployment.cluster.organization_name, namespace=deployment.kube_namespace, selector=f'yatai.ai/bento-repository={target.bento.repository.name},yatai.ai/bento={target.bento.version}'))}",
client=self.session,
) as ws:
jsn = schema_from_object(ws.receive_json(), KubePodWSResponseSchema)
if jsn.type == "error":
raise CloudRESTApiClientError(jsn.message)
return jsn.payload

def tail_logs(
self,
*,
cluster_name: str,
namespace: str,
pod_name: str,
container_name: str = "main",
) -> t.Generator[t.Tuple[str, WebSocketSession], None]:
url_ = urlparse(self.endpoint)
scheme = "wss"
if url_.scheme == "http":
scheme = "ws"
endpoint = f"{scheme}://{url_.netloc}"
with connect_ws(
url=f"{endpoint}/ws/v1/clusters/{cluster_name}/tail?{urlencode(dict(namespace=namespace, pod_name=pod_name))}",
client=self.session,
) as ws:
req_id = str(uuid.uuid4())
ws.send_json(
{
"type": "data",
"payload": {
"id": req_id,
"container_name": container_name,
"follow": True,
"tail_lines": 50,
},
}
)
while True:
try:
jsn = schema_from_object(ws.receive_json(), LogWSResponseSchema)
if jsn.type == "error":
raise CloudRESTApiClientError(jsn.message)
for line in jsn.payload.items:
yield (line, ws)
except WebSocketNetworkError:
break


class RestApiClient:
def __init__(self, endpoint: str, api_token: str) -> None:
Expand Down
Loading

0 comments on commit ddb653d

Please sign in to comment.