Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Separate the serve scheduing logic into its own class #36588

Merged
merged 25 commits into from
Jul 1, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 266 additions & 0 deletions python/ray/serve/_private/deployment_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
import ray
jjyao marked this conversation as resolved.
Show resolved Hide resolved
from typing import Callable, Dict, Tuple, Optional, List, Union
jjyao marked this conversation as resolved.
Show resolved Hide resolved
from dataclasses import dataclass
from collections import defaultdict
from ray._raylet import GcsClient
from ray.serve._private.utils import get_all_node_ids
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


class SpreadDeploymentSchedulingPolicy:
jjyao marked this conversation as resolved.
Show resolved Hide resolved
pass


class DriverDeploymentSchedulingPolicy:
jjyao marked this conversation as resolved.
Show resolved Hide resolved
pass


@dataclass
class ReplicaSchedulingRequest:
"""Request to schedule a single replica.

The scheduler is responsible for scheduling
based on the deployment scheduling policy.
"""

deployment_name: str
replica_name: str
actor_def: ray.actor.ActorClass
actor_resources: Dict
actor_options: Dict
actor_init_args: Tuple
on_scheduled: Callable


@dataclass
class DeploymentDownscaleRequest:
"""Request to stop certain number of replicas.
jjyao marked this conversation as resolved.
Show resolved Hide resolved

The scheduler is responsible for
choosing the replicas to stop.
"""

deployment_name: str
num_to_stop: int


class DeploymentScheduler:
"""A centralized scheduler for all serve deployments.
jjyao marked this conversation as resolved.
Show resolved Hide resolved

It makes scheduling decisions in a batch mode for each update cycle.
jjyao marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, gcs_client: Optional[GcsClient] = None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am hesitant to let the scheduler managing the replica status which is duplicated with DeploymentState responsibility. I may miss some context, can you help me to understand why scheduler need to manage all the replica status. (I was thinking to let DeploymentState to call the scheduler to schedule replicas & remove replicas).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deployment scheduler has it's own state machine which is different from the deployment state one: the state is based on whether we know the node id of the replica since node id information is important for scheduling implementation. For example, deployment scheduler doesn't have a UPDATING state since from scheduler's point of view it's the same as RUNNING (i.e the replica node id is known).

# {deployment_name: scheduling_policy}
self._deployments = {}
# Replicas that are pending to be scheduled.
jjyao marked this conversation as resolved.
Show resolved Hide resolved
# {deployment_name: {replica_name: deployment_upscale_request}}
self._pending_replicas = defaultdict(dict)
# Replicas that are being scheduled.
# The underlying actors are submitted.
jjyao marked this conversation as resolved.
Show resolved Hide resolved
# {deployment_name: {replica_name: target_node_id}}
self._launching_replicas = defaultdict(dict)
# Replicas that are recovering.
# We don't know where those replicas are running.
# {deployment_name: {replica_name}}
self._recovering_replicas = defaultdict(set)
# Replicas that are running.
# We know where those replicas are running.
# {deployment_name: {replica_name: running_node_id}}
self._running_replicas = defaultdict(dict)

if gcs_client:
self._gcs_client = gcs_client
else:
self._gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)

def on_deployment_created(
self,
deployment_name: str,
scheduling_policy: Union[
SpreadDeploymentSchedulingPolicy, DriverDeploymentSchedulingPolicy
],
):
"""This is called whenver a new deployment is created."""
jjyao marked this conversation as resolved.
Show resolved Hide resolved
assert deployment_name not in self._pending_replicas
assert deployment_name not in self._launching_replicas
assert deployment_name not in self._recovering_replicas
assert deployment_name not in self._running_replicas
self._deployments[deployment_name] = scheduling_policy

def on_deployment_deleted(self, deployment_name: str):
"""This is called whenver a deployment is deleted."""
assert not self._pending_replicas[deployment_name]
del self._pending_replicas[deployment_name]
jjyao marked this conversation as resolved.
Show resolved Hide resolved

assert not self._launching_replicas[deployment_name]
del self._launching_replicas[deployment_name]

assert not self._recovering_replicas[deployment_name]
del self._recovering_replicas[deployment_name]

assert not self._running_replicas[deployment_name]
del self._running_replicas[deployment_name]

del self._deployments[deployment_name]

def on_replica_stopping(self, deployment_name: str, replica_name: str):
"""This is called whenver a deployment replica is being stopped."""
jjyao marked this conversation as resolved.
Show resolved Hide resolved
self._pending_replicas[deployment_name].pop(replica_name, None)
self._launching_replicas[deployment_name].pop(replica_name, None)
self._recovering_replicas[deployment_name].discard(replica_name)
self._running_replicas[deployment_name].pop(replica_name, None)

def on_replica_running(self, deployment_name: str, replica_name: str, node_id: str):
"""This is called whenver a deployment replica is running with known node id."""
assert replica_name not in self._pending_replicas[deployment_name]

self._launching_replicas[deployment_name].pop(replica_name, None)
self._recovering_replicas[deployment_name].discard(replica_name)

self._running_replicas[deployment_name][replica_name] = node_id

def on_replica_recovering(self, deployment_name: str, replica_name: str):
"""This is called whenver a deployment replica is recovering."""
assert replica_name not in self._pending_replicas[deployment_name]
assert replica_name not in self._launching_replicas[deployment_name]
assert replica_name not in self._running_replicas[deployment_name]

self._recovering_replicas[deployment_name].add(replica_name)

def schedule(
self,
upscales: Dict[str, List[ReplicaSchedulingRequest]],
downscales: Dict[str, DeploymentDownscaleRequest],
):
"""This is called for each update cycle to do batch scheduling.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help me understand why we need to have self._pending_replicas?
For spread policy, is it always guaranteed that scheduled request will be consumed inside the schedule() call?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that for spread policy, all the schedule requests can be fulfilled immediately inside the schedule() call. But for driver deployment policy, it might not be the case: e.g., if there are recovering replicas, scheduler won't schedule new replicas to avoid multiple replicas on the same node until future schedule() call when all replicas are recovered.


Args:
upscales: a dict of deployment name to a list of replicas to schedule.
downscales: a dict of deployment name to a downscale request.

Returns:
The replicas to stop for each deployment.
"""
for upscale in upscales.values():
for replica_scheduling_request in upscale:
self._pending_replicas[replica_scheduling_request.deployment_name][
replica_scheduling_request.replica_name
] = replica_scheduling_request

for deployment_name, pending_replicas in self._pending_replicas.items():
if not pending_replicas:
continue

deployment_scheduling_policy = self._deployments[deployment_name]
if isinstance(
deployment_scheduling_policy, SpreadDeploymentSchedulingPolicy
):
self._schedule_spread_deployment(deployment_name)
else:
assert isinstance(
deployment_scheduling_policy, DriverDeploymentSchedulingPolicy
)
self._schedule_driver_deployment(deployment_name)

deployment_to_replicas_to_stop = {}
for downscale in downscales.values():
deployment_to_replicas_to_stop[
downscale.deployment_name
] = self._get_replicas_to_stop(
downscale.deployment_name, downscale.num_to_stop
)

return deployment_to_replicas_to_stop

def _schedule_spread_deployment(self, deployment_name):
for pending_replica_name in list(
self._pending_replicas[deployment_name].keys()
):
replica_scheduling_request = self._pending_replicas[deployment_name][
pending_replica_name
]

actor_handle = replica_scheduling_request.actor_def.options(
scheduling_strategy="SPREAD",
**replica_scheduling_request.actor_options,
).remote(*replica_scheduling_request.actor_init_args)
del self._pending_replicas[deployment_name][pending_replica_name]
self._launching_replicas[deployment_name][pending_replica_name] = None
replica_scheduling_request.on_scheduled(actor_handle)

def _schedule_driver_deployment(self, deployment_name):
if self._recovering_replicas[deployment_name]:
shrekris-anyscale marked this conversation as resolved.
Show resolved Hide resolved
# Wait until recovering is done before scheduling new replicas
# so that we can make sure we don't schedule two replicas on the same node.
return

all_nodes = {node_id for node_id, _ in get_all_node_ids(self._gcs_client)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why's the custom gcs_client necessary here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because get_all_node_ids needs gcs_client as argument.

scheduled_nodes = set()
for node_id in self._launching_replicas[deployment_name].values():
assert node_id is not None
scheduled_nodes.add(node_id)
for node_id in self._running_replicas[deployment_name].values():
assert node_id is not None
scheduled_nodes.add(node_id)
unscheduled_nodes = all_nodes - scheduled_nodes

for pending_replica_name in list(
self._pending_replicas[deployment_name].keys()
):
if not unscheduled_nodes:
return

replica_scheduling_request = self._pending_replicas[deployment_name][
pending_replica_name
]

target_node_id = unscheduled_nodes.pop()
shrekris-anyscale marked this conversation as resolved.
Show resolved Hide resolved
actor_handle = replica_scheduling_request.actor_def.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
target_node_id, soft=False
),
**replica_scheduling_request.actor_options,
).remote(*replica_scheduling_request.actor_init_args)
del self._pending_replicas[deployment_name][pending_replica_name]
self._launching_replicas[deployment_name][
pending_replica_name
] = target_node_id
replica_scheduling_request.on_scheduled(actor_handle)

def _get_replicas_to_stop(self, deployment_name, max_num_to_stop):
"""Prioritize replicas that have fewest copies on a node.

This algorithm helps to scale down more intelligently because it can
relinquish node faster. Note that this algorithm doesn't consider other
jjyao marked this conversation as resolved.
Show resolved Hide resolved
deployments or other actors on the same node. See more at
https://github.com/ray-project/ray/issues/20599.
"""
replicas_to_stop = set()

pending_launching_recovering_replicas = set().union(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be behavior change in this case.
Why do we want to remove the non-running replicas first? I think It is not align Prioritize replicas that have fewest copies on a node. in the comments.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its the same behavior: the code will try to prioritize replicas without node id first.

Comment from the original code:

# Replicas not in running state might have _node_id = None.
# We will prioritize those first.

self._pending_replicas[deployment_name].keys(),
self._launching_replicas[deployment_name].keys(),
self._recovering_replicas[deployment_name],
)
for (
pending_launching_recovering_replica
) in pending_launching_recovering_replicas:
if len(replicas_to_stop) == max_num_to_stop:
return replicas_to_stop
else:
replicas_to_stop.add(pending_launching_recovering_replica)

node_to_running_replicas = defaultdict(set)
for running_replica, node_id in self._running_replicas[deployment_name].items():
node_to_running_replicas[node_id].add(running_replica)
for running_replicas in sorted(
node_to_running_replicas.values(), key=lambda lst: len(lst)
):
for running_replica in running_replicas:
if len(replicas_to_stop) == max_num_to_stop:
return replicas_to_stop
else:
replicas_to_stop.add(running_replica)

return replicas_to_stop
Loading