diff --git a/python/ray/serve/_private/common.py b/python/ray/serve/_private/common.py index e6f04723258b0..0277b8e489238 100644 --- a/python/ray/serve/_private/common.py +++ b/python/ray/serve/_private/common.py @@ -577,11 +577,13 @@ def from_proto(cls, proto: StatusOverviewProto) -> "StatusOverview": class RunningReplicaInfo: replica_id: ReplicaID node_id: Optional[str] + node_ip: Optional[str] availability_zone: Optional[str] actor_handle: ActorHandle max_ongoing_requests: int is_cross_language: bool = False multiplexed_model_ids: List[str] = field(default_factory=list) + port: Optional[int] = None def __post_init__(self): # Set hash value when object is constructed. diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index c93415ab310b5..69ac41d68102e 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -242,6 +242,7 @@ def __init__( self._last_health_check_time: float = 0.0 self._consecutive_health_check_failures = 0 self._initialization_latency_s: Optional[float] = None + self._port: Optional[int] = None # Populated in `on_scheduled` or `recover`. self._actor_handle: ActorHandle = None @@ -674,9 +675,12 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]: # This should only update version if the replica is being recovered. # If this is checking on a replica that is newly started, this # should return a version that is identical to what's already stored - _, self._version, self._initialization_latency_s = ray.get( - self._ready_obj_ref - ) + ( + _, + self._version, + self._initialization_latency_s, + self._port, + ) = ray.get(self._ready_obj_ref) except RayTaskError as e: logger.exception( f"Exception in {self._replica_id}, the replica will be stopped." @@ -904,11 +908,13 @@ def get_running_replica_info( return RunningReplicaInfo( replica_id=self._replica_id, node_id=self.actor_node_id, + node_ip=self._actor.node_ip, availability_zone=cluster_node_info_cache.get_node_az(self.actor_node_id), actor_handle=self._actor.actor_handle, max_ongoing_requests=self._actor.max_ongoing_requests, is_cross_language=self._actor.is_cross_language, multiplexed_model_ids=self.multiplexed_model_ids, + port=self._actor._port, ) def record_multiplexed_model_ids(self, multiplexed_model_ids: List[str]): diff --git a/python/ray/serve/_private/proxy.py b/python/ray/serve/_private/proxy.py index ffb3edf064333..157855c549895 100644 --- a/python/ray/serve/_private/proxy.py +++ b/python/ray/serve/_private/proxy.py @@ -913,6 +913,7 @@ def setup_request_context_and_handle( "route": route_path, "app_name": app_name, "_internal_request_id": internal_request_id, + "is_http_request": True, } for key, value in proxy_request.headers: if key.decode() == SERVE_MULTIPLEXED_MODEL_ID: diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index 122c27a965b41..246137f89225e 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -289,6 +289,8 @@ async def __init__( self._deployment_config.autoscaling_config, ) + self._port: Optional[int] = None + def _set_internal_replica_context(self, *, servable_object: Callable = None): ray.serve.context._set_internal_replica_context( replica_id=self._replica_id, @@ -593,7 +595,7 @@ async def initialize_and_get_metadata( self, deployment_config: DeploymentConfig = None, _after: Optional[Any] = None, - ) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float]]: + ) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float], Optional[int]]: """Handles initializing the replica. Returns: 3-tuple containing @@ -636,7 +638,7 @@ async def initialize_and_get_metadata( async def reconfigure( self, deployment_config: DeploymentConfig, - ) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float]]: + ) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float], Optional[int]]: try: user_config_changed = ( deployment_config.user_config != self._deployment_config.user_config @@ -673,11 +675,12 @@ async def reconfigure( def _get_metadata( self, - ) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float]]: + ) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float], Optional[int]]: return ( self._version.deployment_config, self._version, self._initialization_latency, + self._port, ) def _save_cpu_profile_data(self) -> str: diff --git a/python/ray/serve/context.py b/python/ray/serve/context.py index 5fc7b0cc72a7b..32b56f8ffce1f 100644 --- a/python/ray/serve/context.py +++ b/python/ray/serve/context.py @@ -175,6 +175,7 @@ class _RequestContext: app_name: str = "" multiplexed_model_id: str = "" grpc_context: Optional[RayServegRPCContext] = None + is_http_request: bool = False _serve_request_context = contextvars.ContextVar( diff --git a/python/ray/serve/tests/test_actor_replica_wrapper.py b/python/ray/serve/tests/test_actor_replica_wrapper.py index 2400a79ead2f3..6bbe8244a2643 100644 --- a/python/ray/serve/tests/test_actor_replica_wrapper.py +++ b/python/ray/serve/tests/test_actor_replica_wrapper.py @@ -94,6 +94,7 @@ def setup_fake_replica(ray_instance) -> Tuple[ActorReplicaWrapper, ActorHandle]: "fake_replica", deployment_id=DeploymentID(name="fake_deployment") ), node_id=None, + node_ip=None, availability_zone=None, actor_handle=actor_handle, max_ongoing_requests=10, diff --git a/python/ray/serve/tests/test_controller_recovery.py b/python/ray/serve/tests/test_controller_recovery.py index eae85b20cad4e..0042323221b38 100644 --- a/python/ray/serve/tests/test_controller_recovery.py +++ b/python/ray/serve/tests/test_controller_recovery.py @@ -65,7 +65,7 @@ def __call__(self, *args): replica_version_hash = None for replica in deployment_dict[id]: ref = replica.actor_handle._get_metadata.remote() - _, version, _ = ray.get(ref) + _, version, _, _ = ray.get(ref) if replica_version_hash is None: replica_version_hash = hash(version) assert replica_version_hash == hash(version), ( @@ -117,7 +117,7 @@ def __call__(self, *args): for replica_name in recovered_replica_names: actor_handle = ray.get_actor(replica_name, namespace=SERVE_NAMESPACE) ref = actor_handle._get_metadata.remote() - _, version, _ = ray.get(ref) + _, version, _, _ = ray.get(ref) assert replica_version_hash == hash( version ), "Replica version hash should be the same after recover from actor names" diff --git a/python/ray/serve/tests/test_long_poll.py b/python/ray/serve/tests/test_long_poll.py index 1ab0028c550f1..86bf03880e333 100644 --- a/python/ray/serve/tests/test_long_poll.py +++ b/python/ray/serve/tests/test_long_poll.py @@ -231,6 +231,7 @@ def test_listen_for_change_java(serve_instance): str(i), deployment_id=DeploymentID(name="deployment_name") ), node_id="node_id", + node_ip="node_ip", availability_zone="some-az", actor_handle=host, max_ongoing_requests=1, diff --git a/python/ray/serve/tests/unit/test_common.py b/python/ray/serve/tests/unit/test_common.py index 2f4565e82e127..be0a3a63b7015 100644 --- a/python/ray/serve/tests/unit/test_common.py +++ b/python/ray/serve/tests/unit/test_common.py @@ -261,9 +261,15 @@ def __init__(self, actor_id): fake_h2 = FakeActorHandler("1") replica_id = ReplicaID("asdf123", deployment_id=DeploymentID(name="my_deployment")) assert fake_h1 != fake_h2 - replica1 = RunningReplicaInfo(replica_id, "node_id", "some-az", fake_h1, 1, False) - replica2 = RunningReplicaInfo(replica_id, "node_id", "some-az", fake_h2, 1, False) - replica3 = RunningReplicaInfo(replica_id, "node_id", "some-az", fake_h2, 1, True) + replica1 = RunningReplicaInfo( + replica_id, "node_id", "node_ip", "some-az", fake_h1, 1, False + ) + replica2 = RunningReplicaInfo( + replica_id, "node_id", "node_ip", "some-az", fake_h2, 1, False + ) + replica3 = RunningReplicaInfo( + replica_id, "node_id", "node_ip", "some-az", fake_h2, 1, True + ) assert replica1._hash == replica2._hash assert replica3._hash != replica1._hash diff --git a/python/ray/serve/tests/unit/test_deployment_state.py b/python/ray/serve/tests/unit/test_deployment_state.py index 2c9184e2936d5..8cddd0cbb371a 100644 --- a/python/ray/serve/tests/unit/test_deployment_state.py +++ b/python/ray/serve/tests/unit/test_deployment_state.py @@ -91,8 +91,10 @@ def __init__( self._is_cross_language = False self._actor_handle = MockActorHandle() self._node_id = None + self._node_ip = None self._node_id_is_set = False self._actor_id = None + self._port = None self._pg_bundles = None self._initialization_latency_s = -1 @@ -160,6 +162,10 @@ def node_ip(self) -> Optional[str]: def log_file_path(self) -> Optional[str]: return None + @property + def grpc_port(self) -> Optional[int]: + return None + @property def placement_group_bundles(self) -> Optional[List[Dict[str, float]]]: return None diff --git a/python/ray/serve/tests/unit/test_router.py b/python/ray/serve/tests/unit/test_router.py index 7bd1f5d8896ef..78f23c432fa86 100644 --- a/python/ray/serve/tests/unit/test_router.py +++ b/python/ray/serve/tests/unit/test_router.py @@ -634,6 +634,7 @@ def running_replica_info(replica_id: ReplicaID) -> RunningReplicaInfo: return RunningReplicaInfo( replica_id=replica_id, node_id="node_id", + node_ip="node_ip", availability_zone="some-az", actor_handle=Mock(), max_ongoing_requests=1,