Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Improve access log messages #48819

Merged
merged 17 commits into from
Nov 21, 2024
3 changes: 1 addition & 2 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,8 @@
# Logging format with record key to format string dict
SERVE_LOG_RECORD_FORMAT = {
SERVE_LOG_REQUEST_ID: "%(request_id)s",
SERVE_LOG_ROUTE: "%(route)s",
SERVE_LOG_APPLICATION: "%(application)s",
SERVE_LOG_MESSAGE: "%(filename)s:%(lineno)d - %(message)s",
SERVE_LOG_MESSAGE: "-- %(message)s",
SERVE_LOG_LEVEL_NAME: "%(levelname)s",
SERVE_LOG_TIME: "%(asctime)s",
}
Expand Down
6 changes: 2 additions & 4 deletions python/ray/serve/_private/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,6 @@ def format(self, record: logging.LogRecord) -> str:
record_formats_attrs = []
if SERVE_LOG_REQUEST_ID in record.__dict__:
record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_REQUEST_ID])
if SERVE_LOG_ROUTE in record.__dict__:
record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_ROUTE])
record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_MESSAGE])
record_format += " ".join(record_formats_attrs)

Expand All @@ -153,9 +151,9 @@ def format(self, record: logging.LogRecord) -> str:
return formatter.format(record)


def access_log_msg(*, method: str, status: str, latency_ms: float):
def access_log_msg(*, method: str, route: str, status: str, latency_ms: float):
"""Returns a formatted message for an HTTP or ServeHandle access log."""
return f"{method.upper()} {status.upper()} {latency_ms:.1f}ms"
return f"{method} {route} {status} {latency_ms:.1f}ms"


def log_to_stderr_filter(record: logging.LogRecord) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions python/ray/serve/_private/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,11 @@ async def proxy_request(self, proxy_request: ProxyRequest) -> ResponseGenerator:

latency_ms = (time.time() - start_time) * 1000.0
if response_handler_info.should_record_access_log:
request_context = ray.serve.context._serve_request_context.get()
logger.info(
access_log_msg(
method=proxy_request.method,
route=request_context.route,
status=str(status.code),
latency_ms=latency_ms,
),
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/_private/proxy_request_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def request_type(self) -> str:

@property
def method(self) -> str:
return self.scope.get("method", "websocket").upper()
# WebSocket messages don't have a 'method' field.
return self.scope.get("method", "WS").upper()

@property
def route_path(self) -> str:
Expand Down
113 changes: 87 additions & 26 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@
from contextlib import contextmanager
from functools import wraps
from importlib import import_module
from typing import Any, AsyncGenerator, Callable, Dict, Optional, Tuple, Union
from typing import (
Any,
AsyncGenerator,
Callable,
Dict,
Generator,
Optional,
Tuple,
Union,
)

import starlette.responses
from starlette.types import ASGIApp, Message
Expand Down Expand Up @@ -233,12 +242,10 @@ def _add_autoscaling_metrics_point(self) -> None:
)


class ReplicaBase(ABC):
"""
All interaction with the user-provided callable is done via the
`UserCallableWrapper` class.
"""
StatusCodeCallback = Callable[[str], None]


class ReplicaBase(ABC):
def __init__(
self,
replica_id: ReplicaID,
Expand Down Expand Up @@ -326,7 +333,7 @@ def _configure_logger_and_profilers(
def get_num_ongoing_requests(self):
return self._metrics_manager.get_num_ongoing_requests()

def _maybe_get_asgi_route(
def _maybe_get_http_route(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
) -> Optional[str]:
"""Get the matched route string for ASGI apps to be used in logs & metrics.
Expand Down Expand Up @@ -359,13 +366,36 @@ def _maybe_get_asgi_route(

return route

def _maybe_get_http_method(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
) -> Optional[str]:
"""Get the HTTP method to be used in logs & metrics.

If this is not an HTTP request, returns None.
"""
if request_metadata.is_http_request:
req: StreamingHTTPRequest = request_args[0]
# WebSocket messages don't have a 'method' field.
return req.asgi_scope.get("method", "WS")

return None

@contextmanager
def _handle_errors_and_metrics(self, request_metadata):
def _handle_errors_and_metrics(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
) -> Generator[StatusCodeCallback, None, None]:
start_time = time.time()
user_exception = None

status_code = None

def _status_code_callback(s: str):
nonlocal status_code
status_code = s

try:
self._metrics_manager.inc_num_ongoing_requests()
yield
yield _status_code_callback
except asyncio.CancelledError as e:
user_exception = e
self._on_request_cancelled(request_metadata, e)
Expand All @@ -384,16 +414,21 @@ def _handle_errors_and_metrics(self, request_metadata):
else:
status_str = "ERROR"

http_method = self._maybe_get_http_method(request_metadata, request_args)
http_route = request_metadata.route
# Set in _wrap_user_method_call.
logger.info(
access_log_msg(
method=request_metadata.call_method,
status=status_str,
method=http_method or "CALL",
edoakes marked this conversation as resolved.
Show resolved Hide resolved
route=http_route or request_metadata.call_method,
# Prefer the HTTP status code if it was populated.
status=status_code or status_str,
latency_ms=latency_ms,
),
extra={"serve_access_log": True},
)
self._metrics_manager.record_request_metrics(
route=request_metadata.route,
route=http_route,
status_str=status_str,
latency_ms=latency_ms,
was_error=user_exception is not None,
Expand All @@ -407,6 +442,7 @@ async def _call_user_generator(
request_metadata: RequestMetadata,
request_args: Tuple[Any],
request_kwargs: Dict[str, Any],
status_code_callback: StatusCodeCallback,
) -> AsyncGenerator[Any, None]:
"""Calls a user method for a streaming call and yields its results.

Expand All @@ -432,6 +468,7 @@ def _enqueue_thread_safe(item: Any):
)
)

first_message_peeked = False
while True:
wait_for_message_task = self._event_loop.create_task(
result_queue.wait_for_message()
Expand All @@ -448,6 +485,16 @@ def _enqueue_thread_safe(item: Any):
# and use vanilla pickle (we know it's safe because these messages
# only contain primitive Python types).
if request_metadata.is_http_request:
# Peek the first ASGI message to determine the status code.
if not first_message_peeked:
msg = messages[0]
first_message_peeked = True
if msg["type"] == "http.response.start":
# HTTP responses begin with exactly one
# "http.response.start" message containing the "status"
# field. Other response types like WebSockets may not.
status_code_callback(str(msg["status"]))

yield pickle.dumps(messages)
else:
for msg in messages:
Expand All @@ -472,7 +519,7 @@ def _enqueue_thread_safe(item: Any):
wait_for_message_task.cancel()

async def handle_request(
self, request_metadata, *request_args, **request_kwargs
self, request_metadata: RequestMetadata, *request_args, **request_kwargs
) -> Tuple[bytes, Any]:
with self._wrap_user_method_call(request_metadata, request_args):
return await asyncio.wrap_future(
Expand All @@ -482,18 +529,22 @@ async def handle_request(
)

async def handle_request_streaming(
self, request_metadata, *request_args, **request_kwargs
self, request_metadata: RequestMetadata, *request_args, **request_kwargs
) -> AsyncGenerator[Any, None]:
with self._wrap_user_method_call(request_metadata, request_args):
"""Generator that is the entrypoint for all `stream=True` handle calls."""
with self._wrap_user_method_call(
request_metadata, request_args
) as status_code_callback:
async for result in self._call_user_generator(
request_metadata,
request_args,
request_kwargs,
status_code_callback=status_code_callback,
):
yield result

async def handle_request_with_rejection(
self, request_metadata, *request_args, **request_kwargs
self, request_metadata: RequestMetadata, *request_args, **request_kwargs
):
limit = self._deployment_config.max_ongoing_requests
num_ongoing_requests = self.get_num_ongoing_requests()
Expand All @@ -508,7 +559,9 @@ async def handle_request_with_rejection(
)
return

with self._wrap_user_method_call(request_metadata, request_args):
with self._wrap_user_method_call(
request_metadata, request_args
) as status_code_callback:
yield ReplicaQueueLengthInfo(
accepted=True,
# NOTE(edoakes): `_wrap_user_method_call` will increment the number
Expand All @@ -521,6 +574,7 @@ async def handle_request_with_rejection(
request_metadata,
request_args,
request_kwargs,
status_code_callback=status_code_callback,
):
yield result
else:
Expand All @@ -534,7 +588,7 @@ async def handle_request_with_rejection(
async def _on_initialized(self):
raise NotImplementedError

async def initialize(self, deployment_config):
async def initialize(self, deployment_config: DeploymentConfig):
try:
# Ensure that initialization is only performed once.
# When controller restarts, it will call this method again.
Expand Down Expand Up @@ -620,7 +674,7 @@ def _on_request_failed(self, request_metadata: RequestMetadata, e: Exception):
@contextmanager
def _wrap_user_method_call(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
):
) -> Generator[StatusCodeCallback, None, None]:
pass

async def _drain_ongoing_requests(self):
Expand Down Expand Up @@ -708,16 +762,16 @@ def _on_request_failed(self, request_metadata: RequestMetadata, e: Exception):
@contextmanager
def _wrap_user_method_call(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
):
) -> Generator[StatusCodeCallback, None, None]:
"""Context manager that wraps user method calls.

1) Sets the request context var with appropriate metadata.
2) Records the access log message (if not disabled).
3) Records per-request metrics via the metrics manager.
"""
route = self._maybe_get_asgi_route(request_metadata, request_args)
request_metadata.route = route

request_metadata.route = self._maybe_get_http_route(
request_metadata, request_args
)
ray.serve.context._serve_request_context.set(
ray.serve.context._RequestContext(
route=request_metadata.route,
Expand All @@ -729,8 +783,10 @@ def _wrap_user_method_call(
)
)

with self._handle_errors_and_metrics(request_metadata):
yield
with self._handle_errors_and_metrics(
request_metadata, request_args
) as status_code_callback:
yield status_code_callback


class ReplicaActor:
Expand Down Expand Up @@ -1338,7 +1394,12 @@ async def call_user_method(
)

except Exception:
if request_metadata.is_http_request and asgi_args is not None:
if (
request_metadata.is_http_request
and asgi_args is not None
# If the callable is an ASGI app, it already sent a 500 status response.
and not is_asgi_app
):
await self._send_user_result_over_asgi(
starlette.responses.Response(
"Internal Server Error", status_code=500
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def check_proxy_handle_in_controller():
resp = requests.get("http://127.0.0.1:8000")
assert resp.status_code == 200
wait_for_condition(
check_log_file, log_file=file_path, expected_regex=['.*"message":.*GET 200.*']
check_log_file, log_file=file_path, expected_regex=['.*"message":.*GET / 200.*']
)


Expand Down
Loading