Skip to content

Commit

Permalink
[serve] configure logging for build app task
Browse files Browse the repository at this point in the history
Configure logging for the `build_serve_application` task. Although it is a short-lived task, it is part of every application deployment process, and this can improve observability for the deployment process.
Added some light logging to the task as well.
Example:
```
INFO 2024-07-01 16:54:13,660 build_default_task 15893 application_state.py:1047 - Importing application 'default'.
WARNING 2024-07-01 16:54:13,660 build_default_task 15893 api.py:432 - The default value for `max_ongoing_requests` has changed from 100 to 5 in Ray 2.32.0.
ERROR 2024-07-01 16:54:13,661 build_default_task 15893 application_state.py:1077 - Exception importing application 'default'.
Traceback (most recent call last):
  File "/Users/cindyz/ray/python/ray/serve/_private/application_state.py", line 1049, in build_serve_application
    app = call_app_builder_with_args_if_necessary(import_attr(import_path), args)
  File "/Users/cindyz/ray/python/ray/_private/utils.py", line 1194, in import_attr
    return getattr(module, attr_name)
AttributeError: module 'hello' has no attribute 'haa'
```


Signed-off-by: Cindy Zhang <cindyzyx9@gmail.com>
  • Loading branch information
zcin committed Jul 2, 2024
1 parent db2288c commit c09bd44
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 25 deletions.
65 changes: 41 additions & 24 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
import time
import traceback
from copy import deepcopy
Expand Down Expand Up @@ -30,6 +31,7 @@
from ray.serve._private.deployment_info import DeploymentInfo
from ray.serve._private.deployment_state import DeploymentStateManager
from ray.serve._private.endpoint_state import EndpointState
from ray.serve._private.logging_utils import configure_component_logger
from ray.serve._private.storage.kv_store import KVStoreBase
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import (
Expand All @@ -40,7 +42,7 @@
from ray.serve.config import AutoscalingConfig
from ray.serve.exceptions import RayServeException
from ray.serve.generated.serve_pb2 import DeploymentLanguage
from ray.serve.schema import DeploymentDetails, ServeApplicationSchema
from ray.serve.schema import DeploymentDetails, LoggingConfig, ServeApplicationSchema
from ray.types import ObjectRef

logger = logging.getLogger(SERVE_LOGGER_NAME)
Expand Down Expand Up @@ -123,6 +125,7 @@ def __init__(
deployment_state_manager: DeploymentStateManager,
endpoint_state: EndpointState,
save_checkpoint_func: Callable,
logging_config: LoggingConfig,
):
"""
Args:
Expand All @@ -143,7 +146,7 @@ def __init__(
self._endpoint_state = endpoint_state
self._route_prefix: Optional[str] = None
self._docs_path: Optional[str] = None
self._ingress_deployment_name: str = None
self._ingress_deployment_name: Optional[str] = None

self._status: ApplicationStatus = ApplicationStatus.DEPLOYING
self._deployment_timestamp = time.time()
Expand All @@ -161,6 +164,7 @@ def __init__(
api_type=APIType.UNKNOWN,
)
self._save_checkpoint_func = save_checkpoint_func
self._logging_config = logging_config

@property
def route_prefix(self) -> Optional[str]:
Expand All @@ -186,7 +190,7 @@ def status(self) -> ApplicationStatus:
return self._status

@property
def deployment_timestamp(self) -> int:
def deployment_timestamp(self) -> float:
return self._deployment_timestamp

@property
Expand Down Expand Up @@ -225,7 +229,7 @@ def _set_target_state(
deployment_infos: Optional[Dict[str, DeploymentInfo]],
*,
api_type: APIType,
code_version: str,
code_version: Optional[str],
target_config: Optional[ServeApplicationSchema],
target_capacity: Optional[float] = None,
target_capacity_direction: Optional[TargetCapacityDirection] = None,
Expand Down Expand Up @@ -383,7 +387,7 @@ def apply_app_config(
config: ServeApplicationSchema,
target_capacity: Optional[float],
target_capacity_direction: Optional[TargetCapacityDirection],
deployment_time: int,
deployment_time: float,
) -> None:
"""Apply the config to the application.
Expand All @@ -402,7 +406,6 @@ def apply_app_config(
if config_version == self._target_state.code_version:
try:
overrided_infos = override_deployment_info(
self._name,
self._target_state.deployment_infos,
config,
)
Expand Down Expand Up @@ -455,10 +458,10 @@ def apply_app_config(
enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS,
).remote(
config.import_path,
config.deployment_names,
config_version,
config.name,
config.args,
self._logging_config,
)
self._build_app_task_info = BuildAppTaskInfo(
obj_ref=build_app_obj_ref,
Expand Down Expand Up @@ -524,7 +527,7 @@ def _determine_app_status(self) -> Tuple[ApplicationStatus, str]:
else:
return ApplicationStatus.RUNNING, ""

def _reconcile_build_app_task(self) -> Tuple[Tuple, BuildAppStatus, str]:
def _reconcile_build_app_task(self) -> Tuple[Optional[Dict], BuildAppStatus, str]:
"""If necessary, reconcile the in-progress build task.
Returns:
Expand Down Expand Up @@ -580,7 +583,7 @@ def _reconcile_build_app_task(self) -> Tuple[Tuple, BuildAppStatus, str]:
for params in args
}
overrided_infos = override_deployment_info(
self._name, deployment_infos, self._build_app_task_info.config
deployment_infos, self._build_app_task_info.config
)
self._route_prefix, self._docs_path = self._check_routes(overrided_infos)
return overrided_infos, BuildAppStatus.SUCCEEDED, ""
Expand Down Expand Up @@ -774,10 +777,12 @@ def __init__(
deployment_state_manager: DeploymentStateManager,
endpoint_state: EndpointState,
kv_store: KVStoreBase,
logging_config: LoggingConfig,
):
self._deployment_state_manager = deployment_state_manager
self._endpoint_state = endpoint_state
self._kv_store = kv_store
self._logging_config = logging_config
self._application_states: Dict[str, ApplicationState] = dict()
self._recover_from_checkpoint()

Expand All @@ -792,6 +797,7 @@ def _recover_from_checkpoint(self):
self._deployment_state_manager,
self._endpoint_state,
self._save_checkpoint_func,
self._logging_config,
)
app_state.recover_target_state_from_checkpoint(checkpoint_data)
self._application_states[app_name] = app_state
Expand Down Expand Up @@ -819,20 +825,20 @@ def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:

# Make sure route_prefix is not being used by other application.
live_route_prefixes: Dict[str, str] = {
self._application_states[app_name].route_prefix: app_name
app_state.route_prefix: app_name
for app_name, app_state in self._application_states.items()
if app_state.route_prefix is not None
and not app_state.status == ApplicationStatus.DELETING
and name != app_name
}

for deploy_param in deployment_args:
deploy_app_prefix = deploy_param.get("route_prefix")
if deploy_app_prefix in live_route_prefixes:
deploy_app_prefix: str = deploy_param["route_prefix"]
app_name = live_route_prefixes.get(deploy_app_prefix)
if app_name is not None:
raise RayServeException(
f"Prefix {deploy_app_prefix} is being used by application "
f'"{live_route_prefixes[deploy_app_prefix]}".'
f' Failed to deploy application "{name}".'
f'"{app_name}". Failed to deploy application "{name}".'
)

if name not in self._application_states:
Expand All @@ -841,6 +847,7 @@ def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:
self._deployment_state_manager,
self._endpoint_state,
self._save_checkpoint_func,
self._logging_config,
)
ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))

Expand Down Expand Up @@ -875,6 +882,7 @@ def apply_app_configs(
self._deployment_state_manager,
endpoint_state=self._endpoint_state,
save_checkpoint_func=self._save_checkpoint_func,
logging_config=self._logging_config,
)

self._application_states[app_config.name].apply_app_config(
Expand Down Expand Up @@ -1001,31 +1009,32 @@ def _save_checkpoint_func(
@ray.remote(num_cpus=0, max_calls=1)
def build_serve_application(
import_path: str,
config_deployments: List[str],
code_version: str,
name: str,
args: Dict,
) -> Tuple[List[Dict], Optional[str]]:
logging_config: LoggingConfig,
) -> Tuple[Optional[List[Dict]], Optional[str]]:
"""Import and build a Serve application.
Args:
import_path: import path to top-level bound deployment.
config_deployments: list of deployment names specified in config
with deployment override options. This is used to check that
all deployments specified in the config are valid.
code_version: code version inferred from app config. All
deployment versions are set to this code version.
name: application name. If specified, application will be deployed
without removing existing applications.
args: Arguments to be passed to the application builder.
logging_config: The application logging config, if deployment logging
config is not set, application logging config will be applied to the
deployment logging config.
logging_config: the logging config for the build app task.
Returns:
Deploy arguments: a list of deployment arguments if application
was built successfully, otherwise None.
Error message: a string if an error was raised, otherwise None.
"""
configure_component_logger(
component_name="controller",
component_id=f"build_{name}_{os.getpid()}",
logging_config=logging_config,
)

try:
from ray.serve._private.api import call_app_builder_with_args_if_necessary
from ray.serve._private.deployment_graph_build import build as pipeline_build
Expand All @@ -1034,6 +1043,9 @@ def build_serve_application(
)

# Import and build the application.
args_info_str = f" with arguments {args}" if args else ""
logger.info(f"Importing application '{name}'{args_info_str}.")

app = call_app_builder_with_args_if_necessary(import_attr(import_path), args)
deployments = pipeline_build(app._get_internal_dag_node(), name)
ingress = get_and_validate_ingress_deployment(deployments)
Expand All @@ -1056,14 +1068,19 @@ def build_serve_application(
except KeyboardInterrupt:
# Error is raised when this task is canceled with ray.cancel(), which
# happens when deploy_apps() is called.
logger.info("Existing config deployment request terminated.")
logger.info(
"Existing config deployment request terminated because of keyboard "
"interrupt."
)
return None, None
except Exception:
logger.error(
f"Exception importing application '{name}'.\n{traceback.format_exc()}"
)
return None, traceback.format_exc()


def override_deployment_info(
app_name: str,
deployment_infos: Dict[str, DeploymentInfo],
override_config: Optional[ServeApplicationSchema],
) -> Dict[str, DeploymentInfo]:
Expand Down
5 changes: 4 additions & 1 deletion python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ async def __init__(

# Manage all applications' state
self.application_state_manager = ApplicationStateManager(
self.deployment_state_manager, self.endpoint_state, self.kv_store
self.deployment_state_manager,
self.endpoint_state,
self.kv_store,
self.global_logging_config,
)

# Controller actor details
Expand Down

0 comments on commit c09bd44

Please sign in to comment.