Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Additional metadata and context #47652

Merged
merged 10 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,13 @@ def from_proto(cls, proto: StatusOverviewProto) -> "StatusOverview":
class RunningReplicaInfo:
replica_id: ReplicaID
node_id: Optional[str]
node_ip: Optional[str]
availability_zone: Optional[str]
actor_handle: ActorHandle
max_ongoing_requests: int
is_cross_language: bool = False
multiplexed_model_ids: List[str] = field(default_factory=list)
port: Optional[int] = None

def __post_init__(self):
# Set hash value when object is constructed.
Expand Down
11 changes: 8 additions & 3 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,9 +674,12 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
# This should only update version if the replica is being recovered.
# If this is checking on a replica that is newly started, this
# should return a version that is identical to what's already stored
_, self._version, self._initialization_latency_s = ray.get(
self._ready_obj_ref
)
(
_,
self._version,
self._initialization_latency_s,
self._port,
) = ray.get(self._ready_obj_ref)
except RayTaskError as e:
logger.exception(
f"Exception in {self._replica_id}, the replica will be stopped."
Expand Down Expand Up @@ -904,11 +907,13 @@ def get_running_replica_info(
return RunningReplicaInfo(
replica_id=self._replica_id,
node_id=self.actor_node_id,
node_ip=self._actor.node_ip,
availability_zone=cluster_node_info_cache.get_node_az(self.actor_node_id),
actor_handle=self._actor.actor_handle,
max_ongoing_requests=self._actor.max_ongoing_requests,
is_cross_language=self._actor.is_cross_language,
multiplexed_model_ids=self.multiplexed_model_ids,
port=self._actor._port,
)

def record_multiplexed_model_ids(self, multiplexed_model_ids: List[str]):
Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/_private/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ def setup_request_context_and_handle(
"route": route_path,
"app_name": app_name,
"_internal_request_id": internal_request_id,
"is_http_request": True,
}
for key, value in proxy_request.headers:
if key.decode() == SERVE_MULTIPLEXED_MODEL_ID:
Expand Down
9 changes: 6 additions & 3 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ async def __init__(
self._deployment_config.autoscaling_config,
)

self._port: Optional[int] = None

def _set_internal_replica_context(self, *, servable_object: Callable = None):
ray.serve.context._set_internal_replica_context(
replica_id=self._replica_id,
Expand Down Expand Up @@ -593,7 +595,7 @@ async def initialize_and_get_metadata(
self,
deployment_config: DeploymentConfig = None,
_after: Optional[Any] = None,
) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float]]:
) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float], Optional[int]]:
"""Handles initializing the replica.

Returns: 3-tuple containing
Expand Down Expand Up @@ -636,7 +638,7 @@ async def initialize_and_get_metadata(
async def reconfigure(
self,
deployment_config: DeploymentConfig,
) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float]]:
) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float], Optional[int]]:
try:
user_config_changed = (
deployment_config.user_config != self._deployment_config.user_config
Expand Down Expand Up @@ -673,11 +675,12 @@ async def reconfigure(

def _get_metadata(
self,
) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float]]:
) -> Tuple[DeploymentConfig, DeploymentVersion, Optional[float], Optional[int]]:
GeneDer marked this conversation as resolved.
Show resolved Hide resolved
return (
self._version.deployment_config,
self._version,
self._initialization_latency,
self._port,
)

def _save_cpu_profile_data(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class _RequestContext:
app_name: str = ""
multiplexed_model_id: str = ""
grpc_context: Optional[RayServegRPCContext] = None
is_http_request: bool = False


_serve_request_context = contextvars.ContextVar(
Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/tests/test_actor_replica_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def setup_fake_replica(ray_instance) -> Tuple[ActorReplicaWrapper, ActorHandle]:
"fake_replica", deployment_id=DeploymentID(name="fake_deployment")
),
node_id=None,
node_ip=None,
availability_zone=None,
actor_handle=actor_handle,
max_ongoing_requests=10,
Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/tests/test_long_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ def test_listen_for_change_java(serve_instance):
str(i), deployment_id=DeploymentID(name="deployment_name")
),
node_id="node_id",
node_ip="node_ip",
availability_zone="some-az",
actor_handle=host,
max_ongoing_requests=1,
Expand Down
12 changes: 9 additions & 3 deletions python/ray/serve/tests/unit/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,15 @@ def __init__(self, actor_id):
fake_h2 = FakeActorHandler("1")
replica_id = ReplicaID("asdf123", deployment_id=DeploymentID(name="my_deployment"))
assert fake_h1 != fake_h2
replica1 = RunningReplicaInfo(replica_id, "node_id", "some-az", fake_h1, 1, False)
replica2 = RunningReplicaInfo(replica_id, "node_id", "some-az", fake_h2, 1, False)
replica3 = RunningReplicaInfo(replica_id, "node_id", "some-az", fake_h2, 1, True)
replica1 = RunningReplicaInfo(
replica_id, "node_id", "node_ip", "some-az", fake_h1, 1, False
)
replica2 = RunningReplicaInfo(
replica_id, "node_id", "node_ip", "some-az", fake_h2, 1, False
)
replica3 = RunningReplicaInfo(
replica_id, "node_id", "node_ip", "some-az", fake_h2, 1, True
)
assert replica1._hash == replica2._hash
assert replica3._hash != replica1._hash

Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/tests/unit/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ def running_replica_info(replica_id: ReplicaID) -> RunningReplicaInfo:
return RunningReplicaInfo(
replica_id=replica_id,
node_id="node_id",
node_ip="node_ip",
availability_zone="some-az",
actor_handle=Mock(),
max_ongoing_requests=1,
Expand Down