Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] configure logging for build app task #46347

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 41 additions & 24 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
import time
import traceback
from copy import deepcopy
Expand Down Expand Up @@ -30,6 +31,7 @@
from ray.serve._private.deployment_info import DeploymentInfo
from ray.serve._private.deployment_state import DeploymentStateManager
from ray.serve._private.endpoint_state import EndpointState
from ray.serve._private.logging_utils import configure_component_logger
from ray.serve._private.storage.kv_store import KVStoreBase
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import (
Expand All @@ -40,7 +42,7 @@
from ray.serve.config import AutoscalingConfig
from ray.serve.exceptions import RayServeException
from ray.serve.generated.serve_pb2 import DeploymentLanguage
from ray.serve.schema import DeploymentDetails, ServeApplicationSchema
from ray.serve.schema import DeploymentDetails, LoggingConfig, ServeApplicationSchema
from ray.types import ObjectRef

logger = logging.getLogger(SERVE_LOGGER_NAME)
Expand Down Expand Up @@ -123,6 +125,7 @@ def __init__(
deployment_state_manager: DeploymentStateManager,
endpoint_state: EndpointState,
save_checkpoint_func: Callable,
logging_config: LoggingConfig,
):
"""
Args:
Expand All @@ -143,7 +146,7 @@ def __init__(
self._endpoint_state = endpoint_state
self._route_prefix: Optional[str] = None
self._docs_path: Optional[str] = None
self._ingress_deployment_name: str = None
self._ingress_deployment_name: Optional[str] = None

self._status: ApplicationStatus = ApplicationStatus.DEPLOYING
self._deployment_timestamp = time.time()
Expand All @@ -161,6 +164,7 @@ def __init__(
api_type=APIType.UNKNOWN,
)
self._save_checkpoint_func = save_checkpoint_func
self._logging_config = logging_config

@property
def route_prefix(self) -> Optional[str]:
Expand All @@ -186,7 +190,7 @@ def status(self) -> ApplicationStatus:
return self._status

@property
def deployment_timestamp(self) -> int:
def deployment_timestamp(self) -> float:
return self._deployment_timestamp

@property
Expand Down Expand Up @@ -225,7 +229,7 @@ def _set_target_state(
deployment_infos: Optional[Dict[str, DeploymentInfo]],
*,
api_type: APIType,
code_version: str,
code_version: Optional[str],
target_config: Optional[ServeApplicationSchema],
target_capacity: Optional[float] = None,
target_capacity_direction: Optional[TargetCapacityDirection] = None,
Expand Down Expand Up @@ -383,7 +387,7 @@ def apply_app_config(
config: ServeApplicationSchema,
target_capacity: Optional[float],
target_capacity_direction: Optional[TargetCapacityDirection],
deployment_time: int,
deployment_time: float,
) -> None:
"""Apply the config to the application.

Expand All @@ -402,7 +406,6 @@ def apply_app_config(
if config_version == self._target_state.code_version:
try:
overrided_infos = override_deployment_info(
self._name,
self._target_state.deployment_infos,
config,
)
Expand Down Expand Up @@ -455,10 +458,10 @@ def apply_app_config(
enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS,
).remote(
config.import_path,
config.deployment_names,
config_version,
config.name,
config.args,
self._logging_config,
)
self._build_app_task_info = BuildAppTaskInfo(
obj_ref=build_app_obj_ref,
Expand Down Expand Up @@ -524,7 +527,7 @@ def _determine_app_status(self) -> Tuple[ApplicationStatus, str]:
else:
return ApplicationStatus.RUNNING, ""

def _reconcile_build_app_task(self) -> Tuple[Tuple, BuildAppStatus, str]:
def _reconcile_build_app_task(self) -> Tuple[Optional[Dict], BuildAppStatus, str]:
"""If necessary, reconcile the in-progress build task.

Returns:
Expand Down Expand Up @@ -580,7 +583,7 @@ def _reconcile_build_app_task(self) -> Tuple[Tuple, BuildAppStatus, str]:
for params in args
}
overrided_infos = override_deployment_info(
self._name, deployment_infos, self._build_app_task_info.config
deployment_infos, self._build_app_task_info.config
)
self._route_prefix, self._docs_path = self._check_routes(overrided_infos)
return overrided_infos, BuildAppStatus.SUCCEEDED, ""
Expand Down Expand Up @@ -774,10 +777,12 @@ def __init__(
deployment_state_manager: DeploymentStateManager,
endpoint_state: EndpointState,
kv_store: KVStoreBase,
logging_config: LoggingConfig,
):
self._deployment_state_manager = deployment_state_manager
self._endpoint_state = endpoint_state
self._kv_store = kv_store
self._logging_config = logging_config
self._application_states: Dict[str, ApplicationState] = dict()
self._recover_from_checkpoint()

Expand All @@ -792,6 +797,7 @@ def _recover_from_checkpoint(self):
self._deployment_state_manager,
self._endpoint_state,
self._save_checkpoint_func,
self._logging_config,
)
app_state.recover_target_state_from_checkpoint(checkpoint_data)
self._application_states[app_name] = app_state
Expand Down Expand Up @@ -819,20 +825,20 @@ def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:

# Make sure route_prefix is not being used by other application.
live_route_prefixes: Dict[str, str] = {
self._application_states[app_name].route_prefix: app_name
app_state.route_prefix: app_name
for app_name, app_state in self._application_states.items()
if app_state.route_prefix is not None
and not app_state.status == ApplicationStatus.DELETING
and name != app_name
}

for deploy_param in deployment_args:
deploy_app_prefix = deploy_param.get("route_prefix")
if deploy_app_prefix in live_route_prefixes:
deploy_app_prefix: str = deploy_param["route_prefix"]
app_name = live_route_prefixes.get(deploy_app_prefix)
if app_name is not None:
raise RayServeException(
f"Prefix {deploy_app_prefix} is being used by application "
f'"{live_route_prefixes[deploy_app_prefix]}".'
f' Failed to deploy application "{name}".'
f'"{app_name}". Failed to deploy application "{name}".'
)

if name not in self._application_states:
Expand All @@ -841,6 +847,7 @@ def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:
self._deployment_state_manager,
self._endpoint_state,
self._save_checkpoint_func,
self._logging_config,
)
ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))

Expand Down Expand Up @@ -875,6 +882,7 @@ def apply_app_configs(
self._deployment_state_manager,
endpoint_state=self._endpoint_state,
save_checkpoint_func=self._save_checkpoint_func,
logging_config=self._logging_config,
)

self._application_states[app_config.name].apply_app_config(
Expand Down Expand Up @@ -1001,31 +1009,32 @@ def _save_checkpoint_func(
@ray.remote(num_cpus=0, max_calls=1)
def build_serve_application(
import_path: str,
config_deployments: List[str],
code_version: str,
name: str,
args: Dict,
) -> Tuple[List[Dict], Optional[str]]:
logging_config: LoggingConfig,
) -> Tuple[Optional[List[Dict]], Optional[str]]:
"""Import and build a Serve application.

Args:
import_path: import path to top-level bound deployment.
config_deployments: list of deployment names specified in config
with deployment override options. This is used to check that
all deployments specified in the config are valid.
code_version: code version inferred from app config. All
deployment versions are set to this code version.
name: application name. If specified, application will be deployed
without removing existing applications.
args: Arguments to be passed to the application builder.
logging_config: The application logging config, if deployment logging
config is not set, application logging config will be applied to the
deployment logging config.
logging_config: the logging config for the build app task.
Returns:
Deploy arguments: a list of deployment arguments if application
was built successfully, otherwise None.
Error message: a string if an error was raised, otherwise None.
"""
configure_component_logger(
component_name="controller",
component_id=f"build_{name}_{os.getpid()}",
logging_config=logging_config,
)

try:
from ray.serve._private.api import call_app_builder_with_args_if_necessary
from ray.serve._private.deployment_graph_build import build as pipeline_build
Expand All @@ -1034,6 +1043,9 @@ def build_serve_application(
)

# Import and build the application.
args_info_str = f" with arguments {args}" if args else ""
logger.info(f"Importing application '{name}'{args_info_str}.")

app = call_app_builder_with_args_if_necessary(import_attr(import_path), args)
deployments = pipeline_build(app._get_internal_dag_node(), name)
ingress = get_and_validate_ingress_deployment(deployments)
Expand All @@ -1056,14 +1068,19 @@ def build_serve_application(
except KeyboardInterrupt:
# Error is raised when this task is canceled with ray.cancel(), which
# happens when deploy_apps() is called.
logger.info("Existing config deployment request terminated.")
logger.info(
"Existing config deployment request terminated because of keyboard "
"interrupt."
)
return None, None
except Exception:
logger.error(
f"Exception importing application '{name}'.\n{traceback.format_exc()}"
)
return None, traceback.format_exc()


def override_deployment_info(
app_name: str,
deployment_infos: Dict[str, DeploymentInfo],
override_config: Optional[ServeApplicationSchema],
) -> Dict[str, DeploymentInfo]:
Expand Down
5 changes: 4 additions & 1 deletion python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ async def __init__(

# Manage all applications' state
self.application_state_manager = ApplicationStateManager(
self.deployment_state_manager, self.endpoint_state, self.kv_store
self.deployment_state_manager,
self.endpoint_state,
self.kv_store,
self.global_logging_config,
)

# Controller actor details
Expand Down
Loading