diff --git a/python/ray/serve/_private/application_state.py b/python/ray/serve/_private/application_state.py index fda4865b78d5..13e42ee1b1e6 100644 --- a/python/ray/serve/_private/application_state.py +++ b/python/ray/serve/_private/application_state.py @@ -1,4 +1,5 @@ import logging +import os import time import traceback from copy import deepcopy @@ -30,6 +31,7 @@ from ray.serve._private.deployment_info import DeploymentInfo from ray.serve._private.deployment_state import DeploymentStateManager from ray.serve._private.endpoint_state import EndpointState +from ray.serve._private.logging_utils import configure_component_logger from ray.serve._private.storage.kv_store import KVStoreBase from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import ( @@ -40,7 +42,7 @@ from ray.serve.config import AutoscalingConfig from ray.serve.exceptions import RayServeException from ray.serve.generated.serve_pb2 import DeploymentLanguage -from ray.serve.schema import DeploymentDetails, ServeApplicationSchema +from ray.serve.schema import DeploymentDetails, LoggingConfig, ServeApplicationSchema from ray.types import ObjectRef logger = logging.getLogger(SERVE_LOGGER_NAME) @@ -123,6 +125,7 @@ def __init__( deployment_state_manager: DeploymentStateManager, endpoint_state: EndpointState, save_checkpoint_func: Callable, + logging_config: LoggingConfig, ): """ Args: @@ -143,7 +146,7 @@ def __init__( self._endpoint_state = endpoint_state self._route_prefix: Optional[str] = None self._docs_path: Optional[str] = None - self._ingress_deployment_name: str = None + self._ingress_deployment_name: Optional[str] = None self._status: ApplicationStatus = ApplicationStatus.DEPLOYING self._deployment_timestamp = time.time() @@ -161,6 +164,7 @@ def __init__( api_type=APIType.UNKNOWN, ) self._save_checkpoint_func = save_checkpoint_func + self._logging_config = logging_config @property def route_prefix(self) -> Optional[str]: @@ -186,7 +190,7 @@ def status(self) -> ApplicationStatus: return self._status @property - def deployment_timestamp(self) -> int: + def deployment_timestamp(self) -> float: return self._deployment_timestamp @property @@ -225,7 +229,7 @@ def _set_target_state( deployment_infos: Optional[Dict[str, DeploymentInfo]], *, api_type: APIType, - code_version: str, + code_version: Optional[str], target_config: Optional[ServeApplicationSchema], target_capacity: Optional[float] = None, target_capacity_direction: Optional[TargetCapacityDirection] = None, @@ -383,7 +387,7 @@ def apply_app_config( config: ServeApplicationSchema, target_capacity: Optional[float], target_capacity_direction: Optional[TargetCapacityDirection], - deployment_time: int, + deployment_time: float, ) -> None: """Apply the config to the application. @@ -402,7 +406,6 @@ def apply_app_config( if config_version == self._target_state.code_version: try: overrided_infos = override_deployment_info( - self._name, self._target_state.deployment_infos, config, ) @@ -455,10 +458,10 @@ def apply_app_config( enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS, ).remote( config.import_path, - config.deployment_names, config_version, config.name, config.args, + self._logging_config, ) self._build_app_task_info = BuildAppTaskInfo( obj_ref=build_app_obj_ref, @@ -524,7 +527,7 @@ def _determine_app_status(self) -> Tuple[ApplicationStatus, str]: else: return ApplicationStatus.RUNNING, "" - def _reconcile_build_app_task(self) -> Tuple[Tuple, BuildAppStatus, str]: + def _reconcile_build_app_task(self) -> Tuple[Optional[Dict], BuildAppStatus, str]: """If necessary, reconcile the in-progress build task. Returns: @@ -580,7 +583,7 @@ def _reconcile_build_app_task(self) -> Tuple[Tuple, BuildAppStatus, str]: for params in args } overrided_infos = override_deployment_info( - self._name, deployment_infos, self._build_app_task_info.config + deployment_infos, self._build_app_task_info.config ) self._route_prefix, self._docs_path = self._check_routes(overrided_infos) return overrided_infos, BuildAppStatus.SUCCEEDED, "" @@ -774,10 +777,12 @@ def __init__( deployment_state_manager: DeploymentStateManager, endpoint_state: EndpointState, kv_store: KVStoreBase, + logging_config: LoggingConfig, ): self._deployment_state_manager = deployment_state_manager self._endpoint_state = endpoint_state self._kv_store = kv_store + self._logging_config = logging_config self._application_states: Dict[str, ApplicationState] = dict() self._recover_from_checkpoint() @@ -792,6 +797,7 @@ def _recover_from_checkpoint(self): self._deployment_state_manager, self._endpoint_state, self._save_checkpoint_func, + self._logging_config, ) app_state.recover_target_state_from_checkpoint(checkpoint_data) self._application_states[app_name] = app_state @@ -819,7 +825,7 @@ def deploy_app(self, name: str, deployment_args: List[Dict]) -> None: # Make sure route_prefix is not being used by other application. live_route_prefixes: Dict[str, str] = { - self._application_states[app_name].route_prefix: app_name + app_state.route_prefix: app_name for app_name, app_state in self._application_states.items() if app_state.route_prefix is not None and not app_state.status == ApplicationStatus.DELETING @@ -827,12 +833,12 @@ def deploy_app(self, name: str, deployment_args: List[Dict]) -> None: } for deploy_param in deployment_args: - deploy_app_prefix = deploy_param.get("route_prefix") - if deploy_app_prefix in live_route_prefixes: + deploy_app_prefix: str = deploy_param["route_prefix"] + app_name = live_route_prefixes.get(deploy_app_prefix) + if app_name is not None: raise RayServeException( f"Prefix {deploy_app_prefix} is being used by application " - f'"{live_route_prefixes[deploy_app_prefix]}".' - f' Failed to deploy application "{name}".' + f'"{app_name}". Failed to deploy application "{name}".' ) if name not in self._application_states: @@ -841,6 +847,7 @@ def deploy_app(self, name: str, deployment_args: List[Dict]) -> None: self._deployment_state_manager, self._endpoint_state, self._save_checkpoint_func, + self._logging_config, ) ServeUsageTag.NUM_APPS.record(str(len(self._application_states))) @@ -875,6 +882,7 @@ def apply_app_configs( self._deployment_state_manager, endpoint_state=self._endpoint_state, save_checkpoint_func=self._save_checkpoint_func, + logging_config=self._logging_config, ) self._application_states[app_config.name].apply_app_config( @@ -1001,31 +1009,32 @@ def _save_checkpoint_func( @ray.remote(num_cpus=0, max_calls=1) def build_serve_application( import_path: str, - config_deployments: List[str], code_version: str, name: str, args: Dict, -) -> Tuple[List[Dict], Optional[str]]: + logging_config: LoggingConfig, +) -> Tuple[Optional[List[Dict]], Optional[str]]: """Import and build a Serve application. Args: import_path: import path to top-level bound deployment. - config_deployments: list of deployment names specified in config - with deployment override options. This is used to check that - all deployments specified in the config are valid. code_version: code version inferred from app config. All deployment versions are set to this code version. name: application name. If specified, application will be deployed without removing existing applications. args: Arguments to be passed to the application builder. - logging_config: The application logging config, if deployment logging - config is not set, application logging config will be applied to the - deployment logging config. + logging_config: the logging config for the build app task. Returns: Deploy arguments: a list of deployment arguments if application was built successfully, otherwise None. Error message: a string if an error was raised, otherwise None. """ + configure_component_logger( + component_name="controller", + component_id=f"build_{name}_{os.getpid()}", + logging_config=logging_config, + ) + try: from ray.serve._private.api import call_app_builder_with_args_if_necessary from ray.serve._private.deployment_graph_build import build as pipeline_build @@ -1034,6 +1043,9 @@ def build_serve_application( ) # Import and build the application. + args_info_str = f" with arguments {args}" if args else "" + logger.info(f"Importing application '{name}'{args_info_str}.") + app = call_app_builder_with_args_if_necessary(import_attr(import_path), args) deployments = pipeline_build(app._get_internal_dag_node(), name) ingress = get_and_validate_ingress_deployment(deployments) @@ -1056,14 +1068,19 @@ def build_serve_application( except KeyboardInterrupt: # Error is raised when this task is canceled with ray.cancel(), which # happens when deploy_apps() is called. - logger.info("Existing config deployment request terminated.") + logger.info( + "Existing config deployment request terminated because of keyboard " + "interrupt." + ) return None, None except Exception: + logger.error( + f"Exception importing application '{name}'.\n{traceback.format_exc()}" + ) return None, traceback.format_exc() def override_deployment_info( - app_name: str, deployment_infos: Dict[str, DeploymentInfo], override_config: Optional[ServeApplicationSchema], ) -> Dict[str, DeploymentInfo]: diff --git a/python/ray/serve/_private/controller.py b/python/ray/serve/_private/controller.py index 600cb2ef2b63..e09e8098f051 100644 --- a/python/ray/serve/_private/controller.py +++ b/python/ray/serve/_private/controller.py @@ -185,7 +185,10 @@ async def __init__( # Manage all applications' state self.application_state_manager = ApplicationStateManager( - self.deployment_state_manager, self.endpoint_state, self.kv_store + self.deployment_state_manager, + self.endpoint_state, + self.kv_store, + self.global_logging_config, ) # Controller actor details