Skip to content

Commit

Permalink
[serve] Improve access log messages (#48819)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

Improves the Serve access log to include HTTP status information on the
replica and better resemble standard log formats like `uvicorn`.

I'm updated the log format to:

- Include the HTTP method, route and status code for HTTP requests like:
"GET / 200".
- Use "CALL", the method name, and "OK"/"ERROR" for `DeploymentHandle`
calls like: "CALL method_name OK".
- Stop logging the `route` information on every message and isolate it
to the access log message instead. This adds clutter and it can be tied
to the access log message using the request ID.
- Stop logging the filename and line number in every log message. This
has been on my TODO list for awhile after internal discussion.

### Examples

Calling a handle method:
```
INFO 2024-11-20 13:51:44,803 default_D lswbv98w d0cadb74-9fd8-4a68-9e9a-c9ed20b091f4 -- CALL method OK 1.3ms
```

Calling a basic HTTP route:
```
INFO 2024-11-20 13:53:00,197 default_A zosusx8c 26891433-a91a-4d27-b543-0936feb5f5c1 -- GET / 200 4.1ms
```

Calling a wildcard HTTP route (and non-200 status code):
```
INFO 2024-11-20 13:53:58,102 default_A iovmsu5e 85000f14-8e31-42a5-a1f4-0fa2d39c549a -- GET /{wildcard} 422 3.7ms
```

---------

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
  • Loading branch information
edoakes authored Nov 21, 2024
1 parent e51cec6 commit 569f7df
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 47 deletions.
3 changes: 1 addition & 2 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,8 @@
# Logging format with record key to format string dict
SERVE_LOG_RECORD_FORMAT = {
SERVE_LOG_REQUEST_ID: "%(request_id)s",
SERVE_LOG_ROUTE: "%(route)s",
SERVE_LOG_APPLICATION: "%(application)s",
SERVE_LOG_MESSAGE: "%(filename)s:%(lineno)d - %(message)s",
SERVE_LOG_MESSAGE: "-- %(message)s",
SERVE_LOG_LEVEL_NAME: "%(levelname)s",
SERVE_LOG_TIME: "%(asctime)s",
}
Expand Down
6 changes: 2 additions & 4 deletions python/ray/serve/_private/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,6 @@ def format(self, record: logging.LogRecord) -> str:
record_formats_attrs = []
if SERVE_LOG_REQUEST_ID in record.__dict__:
record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_REQUEST_ID])
if SERVE_LOG_ROUTE in record.__dict__:
record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_ROUTE])
record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_MESSAGE])
record_format += " ".join(record_formats_attrs)

Expand All @@ -153,9 +151,9 @@ def format(self, record: logging.LogRecord) -> str:
return formatter.format(record)


def access_log_msg(*, method: str, status: str, latency_ms: float):
def access_log_msg(*, method: str, route: str, status: str, latency_ms: float):
"""Returns a formatted message for an HTTP or ServeHandle access log."""
return f"{method.upper()} {status.upper()} {latency_ms:.1f}ms"
return f"{method} {route} {status} {latency_ms:.1f}ms"


def log_to_stderr_filter(record: logging.LogRecord) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions python/ray/serve/_private/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,11 @@ async def proxy_request(self, proxy_request: ProxyRequest) -> ResponseGenerator:

latency_ms = (time.time() - start_time) * 1000.0
if response_handler_info.should_record_access_log:
request_context = ray.serve.context._serve_request_context.get()
logger.info(
access_log_msg(
method=proxy_request.method,
route=request_context.route,
status=str(status.code),
latency_ms=latency_ms,
),
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/_private/proxy_request_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def request_type(self) -> str:

@property
def method(self) -> str:
return self.scope.get("method", "websocket").upper()
# WebSocket messages don't have a 'method' field.
return self.scope.get("method", "WS").upper()

@property
def route_path(self) -> str:
Expand Down
113 changes: 87 additions & 26 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@
from contextlib import contextmanager
from functools import wraps
from importlib import import_module
from typing import Any, AsyncGenerator, Callable, Dict, Optional, Tuple, Union
from typing import (
Any,
AsyncGenerator,
Callable,
Dict,
Generator,
Optional,
Tuple,
Union,
)

import starlette.responses
from starlette.types import ASGIApp, Message
Expand Down Expand Up @@ -233,12 +242,10 @@ def _add_autoscaling_metrics_point(self) -> None:
)


class ReplicaBase(ABC):
"""
All interaction with the user-provided callable is done via the
`UserCallableWrapper` class.
"""
StatusCodeCallback = Callable[[str], None]


class ReplicaBase(ABC):
def __init__(
self,
replica_id: ReplicaID,
Expand Down Expand Up @@ -326,7 +333,7 @@ def _configure_logger_and_profilers(
def get_num_ongoing_requests(self):
return self._metrics_manager.get_num_ongoing_requests()

def _maybe_get_asgi_route(
def _maybe_get_http_route(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
) -> Optional[str]:
"""Get the matched route string for ASGI apps to be used in logs & metrics.
Expand Down Expand Up @@ -359,13 +366,36 @@ def _maybe_get_asgi_route(

return route

def _maybe_get_http_method(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
) -> Optional[str]:
"""Get the HTTP method to be used in logs & metrics.
If this is not an HTTP request, returns None.
"""
if request_metadata.is_http_request:
req: StreamingHTTPRequest = request_args[0]
# WebSocket messages don't have a 'method' field.
return req.asgi_scope.get("method", "WS")

return None

@contextmanager
def _handle_errors_and_metrics(self, request_metadata):
def _handle_errors_and_metrics(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
) -> Generator[StatusCodeCallback, None, None]:
start_time = time.time()
user_exception = None

status_code = None

def _status_code_callback(s: str):
nonlocal status_code
status_code = s

try:
self._metrics_manager.inc_num_ongoing_requests()
yield
yield _status_code_callback
except asyncio.CancelledError as e:
user_exception = e
self._on_request_cancelled(request_metadata, e)
Expand All @@ -384,16 +414,21 @@ def _handle_errors_and_metrics(self, request_metadata):
else:
status_str = "ERROR"

http_method = self._maybe_get_http_method(request_metadata, request_args)
http_route = request_metadata.route
# Set in _wrap_user_method_call.
logger.info(
access_log_msg(
method=request_metadata.call_method,
status=status_str,
method=http_method or "CALL",
route=http_route or request_metadata.call_method,
# Prefer the HTTP status code if it was populated.
status=status_code or status_str,
latency_ms=latency_ms,
),
extra={"serve_access_log": True},
)
self._metrics_manager.record_request_metrics(
route=request_metadata.route,
route=http_route,
status_str=status_str,
latency_ms=latency_ms,
was_error=user_exception is not None,
Expand All @@ -407,6 +442,7 @@ async def _call_user_generator(
request_metadata: RequestMetadata,
request_args: Tuple[Any],
request_kwargs: Dict[str, Any],
status_code_callback: StatusCodeCallback,
) -> AsyncGenerator[Any, None]:
"""Calls a user method for a streaming call and yields its results.
Expand All @@ -432,6 +468,7 @@ def _enqueue_thread_safe(item: Any):
)
)

first_message_peeked = False
while True:
wait_for_message_task = self._event_loop.create_task(
result_queue.wait_for_message()
Expand All @@ -448,6 +485,16 @@ def _enqueue_thread_safe(item: Any):
# and use vanilla pickle (we know it's safe because these messages
# only contain primitive Python types).
if request_metadata.is_http_request:
# Peek the first ASGI message to determine the status code.
if not first_message_peeked:
msg = messages[0]
first_message_peeked = True
if msg["type"] == "http.response.start":
# HTTP responses begin with exactly one
# "http.response.start" message containing the "status"
# field. Other response types like WebSockets may not.
status_code_callback(str(msg["status"]))

yield pickle.dumps(messages)
else:
for msg in messages:
Expand All @@ -472,7 +519,7 @@ def _enqueue_thread_safe(item: Any):
wait_for_message_task.cancel()

async def handle_request(
self, request_metadata, *request_args, **request_kwargs
self, request_metadata: RequestMetadata, *request_args, **request_kwargs
) -> Tuple[bytes, Any]:
with self._wrap_user_method_call(request_metadata, request_args):
return await asyncio.wrap_future(
Expand All @@ -482,18 +529,22 @@ async def handle_request(
)

async def handle_request_streaming(
self, request_metadata, *request_args, **request_kwargs
self, request_metadata: RequestMetadata, *request_args, **request_kwargs
) -> AsyncGenerator[Any, None]:
with self._wrap_user_method_call(request_metadata, request_args):
"""Generator that is the entrypoint for all `stream=True` handle calls."""
with self._wrap_user_method_call(
request_metadata, request_args
) as status_code_callback:
async for result in self._call_user_generator(
request_metadata,
request_args,
request_kwargs,
status_code_callback=status_code_callback,
):
yield result

async def handle_request_with_rejection(
self, request_metadata, *request_args, **request_kwargs
self, request_metadata: RequestMetadata, *request_args, **request_kwargs
):
limit = self._deployment_config.max_ongoing_requests
num_ongoing_requests = self.get_num_ongoing_requests()
Expand All @@ -508,7 +559,9 @@ async def handle_request_with_rejection(
)
return

with self._wrap_user_method_call(request_metadata, request_args):
with self._wrap_user_method_call(
request_metadata, request_args
) as status_code_callback:
yield ReplicaQueueLengthInfo(
accepted=True,
# NOTE(edoakes): `_wrap_user_method_call` will increment the number
Expand All @@ -521,6 +574,7 @@ async def handle_request_with_rejection(
request_metadata,
request_args,
request_kwargs,
status_code_callback=status_code_callback,
):
yield result
else:
Expand All @@ -534,7 +588,7 @@ async def handle_request_with_rejection(
async def _on_initialized(self):
raise NotImplementedError

async def initialize(self, deployment_config):
async def initialize(self, deployment_config: DeploymentConfig):
try:
# Ensure that initialization is only performed once.
# When controller restarts, it will call this method again.
Expand Down Expand Up @@ -620,7 +674,7 @@ def _on_request_failed(self, request_metadata: RequestMetadata, e: Exception):
@contextmanager
def _wrap_user_method_call(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
):
) -> Generator[StatusCodeCallback, None, None]:
pass

async def _drain_ongoing_requests(self):
Expand Down Expand Up @@ -708,16 +762,16 @@ def _on_request_failed(self, request_metadata: RequestMetadata, e: Exception):
@contextmanager
def _wrap_user_method_call(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
):
) -> Generator[StatusCodeCallback, None, None]:
"""Context manager that wraps user method calls.
1) Sets the request context var with appropriate metadata.
2) Records the access log message (if not disabled).
3) Records per-request metrics via the metrics manager.
"""
route = self._maybe_get_asgi_route(request_metadata, request_args)
request_metadata.route = route

request_metadata.route = self._maybe_get_http_route(
request_metadata, request_args
)
ray.serve.context._serve_request_context.set(
ray.serve.context._RequestContext(
route=request_metadata.route,
Expand All @@ -729,8 +783,10 @@ def _wrap_user_method_call(
)
)

with self._handle_errors_and_metrics(request_metadata):
yield
with self._handle_errors_and_metrics(
request_metadata, request_args
) as status_code_callback:
yield status_code_callback


class ReplicaActor:
Expand Down Expand Up @@ -1338,7 +1394,12 @@ async def call_user_method(
)

except Exception:
if request_metadata.is_http_request and asgi_args is not None:
if (
request_metadata.is_http_request
and asgi_args is not None
# If the callable is an ASGI app, it already sent a 500 status response.
and not is_asgi_app
):
await self._send_user_result_over_asgi(
starlette.responses.Response(
"Internal Server Error", status_code=500
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def check_proxy_handle_in_controller():
resp = requests.get("http://127.0.0.1:8000")
assert resp.status_code == 200
wait_for_condition(
check_log_file, log_file=file_path, expected_regex=['.*"message":.*GET 200.*']
check_log_file, log_file=file_path, expected_regex=['.*"message":.*GET / 200.*']
)


Expand Down
Loading

0 comments on commit 569f7df

Please sign in to comment.