-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Serve] Separate the serve scheduing logic into its own class #36588
Conversation
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
The PR is ready for first round of review. In the meantime, I'll fix and add more tests. |
|
||
# Check if the model_id has changed. | ||
running_replicas_changed |= self._multiplexed_model_ids_updated | ||
self._check_and_update_replicas() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I flip the order between _check_and_update_replicas
and _scale_deployment_replicas
since I think we should first refresh the status of existing replicas and then scale based on the latest status. For example, its possible that _check_and_update_replicas
stopped a replica so that _scale_deployment_replicas
needs to start a new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be fine; I don't think this ordering is important. The only downside is it may take an extra iteration for deployment to initially go from starting -> running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, that's true if starting/initialization is super fast.
self._deployment_states[deployment_name].stop_replicas(replicas_to_stop) | ||
|
||
for deployment_name, deployment_state in self._deployment_states.items(): | ||
if set(running_replica_infos_before_update[deployment_name]) != set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just decide whether to notify by comparing before and after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach looks good for a first cut
|
||
# Check if the model_id has changed. | ||
running_replicas_changed |= self._multiplexed_model_ids_updated | ||
self._check_and_update_replicas() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be fine; I don't think this ordering is important. The only downside is it may take an extra iteration for deployment to initially go from starting -> running.
upscales: Dict[str, List[ReplicaSchedulingRequest]], | ||
downscales: Dict[str, DeploymentDownscaleRequest], | ||
): | ||
"""This is called for each update cycle to do batch scheduling. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help me understand why we need to have self._pending_replicas?
For spread policy, is it always guaranteed that scheduled request will be consumed inside the schedule() call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right that for spread policy, all the schedule requests can be fulfilled immediately inside the schedule()
call. But for driver deployment policy, it might not be the case: e.g., if there are recovering replicas, scheduler won't schedule new replicas to avoid multiple replicas on the same node until future schedule() call when all replicas are recovered.
""" | ||
replicas_to_stop = set() | ||
|
||
pending_launching_recovering_replicas = set().union( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might be behavior change in this case.
Why do we want to remove the non-running replicas first? I think It is not align Prioritize replicas that have fewest copies on a node.
in the comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its the same behavior: the code will try to prioritize replicas without node id first.
Comment from the original code:
# Replicas not in running state might have _node_id = None.
# We will prioritize those first.
It makes scheduling decisions in a batch mode for each update cycle. | ||
""" | ||
|
||
def __init__(self, gcs_client: Optional[GcsClient] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am hesitant to let the scheduler managing the replica status which is duplicated with DeploymentState responsibility. I may miss some context, can you help me to understand why scheduler need to manage all the replica status. (I was thinking to let DeploymentState to call the scheduler to schedule replicas & remove replicas).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deployment scheduler has it's own state machine which is different from the deployment state one: the state is based on whether we know the node id of the replica since node id information is important for scheduling implementation. For example, deployment scheduler doesn't have a UPDATING state since from scheduler's point of view it's the same as RUNNING (i.e the replica node id is known).
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work so far! I left some comments.
def _notify_running_replicas_changed(self): | ||
def notify_running_replicas_changed(self) -> None: | ||
running_replica_infos = self.get_running_replica_infos() | ||
if ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To confirm, this isn't a behavior change right? This is an optimization to reduce the number of times we send a notification with the LongPollHost
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no behavior change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this related to the scheduling change? or just an independent change?
if unrelated, please separate into its own PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, it's related.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
# so that we can make sure we don't schedule two replicas on the same node. | ||
return | ||
|
||
all_nodes = {node_id for node_id, _ in get_all_node_ids(self._gcs_client)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why's the custom gcs_client necessary here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because get_all_node_ids
needs gcs_client as argument.
def _notify_running_replicas_changed(self): | ||
def notify_running_replicas_changed(self) -> None: | ||
running_replica_infos = self.get_running_replica_infos() | ||
if ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this related to the scheduling change? or just an independent change?
if unrelated, please separate into its own PR.
for replica in self._replicas.pop(states=[ReplicaState.STARTING]): | ||
self._stop_replica(replica) | ||
|
||
return upscale |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confused by this return -- why are we early returning upscale
list in the downscale case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this just to early return with an empty list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, just early return an empty list. I changed to return an empty list directly to be obvious
num_existing_replicas = self._replicas.count() | ||
if num_existing_replicas >= self._target_state.num_replicas: | ||
num_running_replicas = self._replicas.count(states=[ReplicaState.RUNNING]) | ||
if num_running_replicas >= self._target_state.num_replicas: | ||
for replica in self._replicas.pop(states=[ReplicaState.STARTING]): | ||
self._stop_replica(replica) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this logic live in the scheduler rather than here as part of the downscale request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not the normal downscaling (driver deployment doesn't downscale based on qps). It's about cancelling extra replicas.
I added some comments to make it clear:
# Cancel starting replicas when driver deployment state creates
# more replicas than alive nodes.
# For example, get_all_node_ids returns 4 nodes when
# the driver deployment state decides the target number of replicas
# but later on when the deployment scheduler schedules these 4 replicas,
# there are only 3 alive nodes (1 node dies in between).
# In this case, 1 replica will be in the PENDING_ALLOCATION and we
# cancel it here.
deleted, recovering, upscale, downscale = deployment_state.update() | ||
if upscale: | ||
upscales[deployment_name] = upscale | ||
if downscale: | ||
downscales[deployment_name] = downscale |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted, recovering, upscale, downscale = deployment_state.update() | |
if upscale: | |
upscales[deployment_name] = upscale | |
if downscale: | |
downscales[deployment_name] = downscale | |
deleted, recovering, upscales[deployment_name], downscales[deployment_name] = deployment_state.update() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to add to upscales/downscales dict if the deployment has no upscale or downscale request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let's merge after branch cut!
…oject#36588) Separate the serve scheduling logic into it's own class and switch to batch scheduling for making better scheduling decisions. Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com> Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Why are these changes needed?
Separate the serve scheduling logic into it's own class and switch to batch scheduling for making better scheduling decisions.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.