From ab62fe9d003d90ac70276e788121c7d7484cff12 Mon Sep 17 00:00:00 2001 From: apatke Date: Thu, 5 Sep 2024 20:19:37 +0000 Subject: [PATCH 01/12] add priority scheduling --- vllm/config.py | 6 ++- vllm/core/scheduler.py | 91 +++++++++++++++++++++++++++++++++++++++ vllm/engine/llm_engine.py | 17 ++++++-- vllm/entrypoints/llm.py | 11 ++++- vllm/sequence.py | 4 ++ 5 files changed, 121 insertions(+), 8 deletions(-) diff --git a/vllm/config.py b/vllm/config.py index 0a34dabf57e7c..1db9d57aee531 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -934,7 +934,7 @@ class SchedulerConfig: workers instead of an entire data. It should be enabled only when SPMD worker architecture is enabled. I.e., VLLM_USE_RAY_SPMD_WORKER=1 - + policy: The scheduling policy to use. "fcfs" (default) or "priority". """ def __init__(self, @@ -948,7 +948,8 @@ def __init__(self, embedding_mode: Optional[bool] = False, preemption_mode: Optional[str] = None, num_scheduler_steps: int = 1, - send_delta_data: bool = False) -> None: + send_delta_data: bool = False, + policy: str = "sp") -> None: if max_num_batched_tokens is not None: self.max_num_batched_tokens = max_num_batched_tokens else: @@ -979,6 +980,7 @@ def __init__(self, self.preemption_mode = preemption_mode self.num_scheduler_steps = num_scheduler_steps self.send_delta_data = send_delta_data + self.policy = policy self._verify_args() def _verify_args(self) -> None: diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 4c2f715820317..51b1d05225eac 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -763,6 +763,94 @@ def _get_prompt_limit(self, seq_group: SequenceGroup) -> int: else: return prompt_limit + + def _get_priority(self, seq_group: SequenceGroup) -> float: + """ Get the priority of the sequence group. Highest preference to user-defined priority, followed by arrival time. + Args: + seq_group: The sequence group input. + Returns: + The priority of the sequence group. + """ + return (seq_group.priority,seq_group.arrival_time) + + def _schedule_priority_preemption( + self, + budget: SchedulingBudget, + ) -> Tuple[deque, deque, int]: + """Force preempt requests from the running queue + if their priority is lower. + Args: + policy: Scheduling policy for sorting waiting and running queues. + budget: The scheduling budget. The argument is in-place updated + when any requests are scheduled. + Returns: + A count of priority-based preemptions. + """ + + running_queue = self.running + waiting_queue = self.waiting + + running_queue = deque(sorted(running_queue, key=lambda x: self._get_priority(x))) + waiting_queue = deque(sorted(waiting_queue, key=lambda x: self._get_priority(x))) + + blocks_to_swap_out: List[Tuple[int, int]] = [] + force_preemption_cnt = 0 + + + #print("Initial waiting queue is ", [x.request_id for x in waiting_queue]) + while waiting_queue: + print("waiting queue priority is ", [x.priority for x in waiting_queue]) + print("front of waiting queue is ", waiting_queue[0].request_id) + seq_group = waiting_queue[0] + waiting_queue.popleft() + num_new_seqs = seq_group.get_max_num_running_seqs() + num_new_tokens = self._get_num_new_tokens(seq_group, + SequenceStatus.WAITING, + False, budget) + + now = time.time() + # Only preempt if priority inversion exists + while running_queue and self._get_priority(running_queue[-1]) > self._get_priority(seq_group): + print("priority inversion exists") + import pdb; pdb.set_trace() + #Only preempt if waiting sequence cannot be allocated + print("Number of new tokens is ", num_new_tokens) + print("Number of new seqs is ", num_new_seqs) + can_allocate = self.block_manager.can_allocate(seq_group) + print("Allocation status is ", can_allocate) + if (num_new_tokens > 0 + and budget.can_schedule(num_new_tokens=num_new_tokens, + num_new_seqs=num_new_seqs) + and can_allocate == AllocStatus.OK): + break + + #Adjust budget to remove the victim sequence group + print("victim sequence group is ", running_queue[-1].request_id) + vseq_group = running_queue.pop() + num_running_tokens = self._get_num_new_tokens( + vseq_group, SequenceStatus.RUNNING, False, budget) + budget.subtract_num_batched_tokens(vseq_group.request_id, + num_running_tokens) + num_running_seqs = vseq_group.get_max_num_running_seqs() + budget.subtract_num_seqs(vseq_group.request_id, + num_running_seqs) + + #Preempt out the victim sequence group + self._preempt(vseq_group, blocks_to_swap_out, + PreemptionMode.RECOMPUTE) + waiting_queue.appendleft(vseq_group) + force_preemption_cnt += 1 + #Put the sequence back into the waiting queue + waiting_queue.appendleft(seq_group) + + + waiting_queue = deque(sorted(waiting_queue, key=lambda x: self._get_priority(x))) + + self.waiting = waiting_queue + self.running = running_queue + #print("Final waiting queue is ", [x.request_id for x in self.waiting]) + return force_preemption_cnt + def _schedule_prefills( self, budget: SchedulingBudget, @@ -913,6 +1001,9 @@ def _schedule_default(self) -> SchedulerOutputs: prefills = self._schedule_prefills(budget, curr_loras, enable_chunking=False) + + if len(prefills.seq_groups)==0 and self.scheduler_config.policy == "sp": + forced_preemptions = self._schedule_priority_preemption(budget) # Don't schedule decodes if prefills are scheduled. # NOTE: If `_schedule_prefills` doesn't enable chunking, self.running diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 92c02072593e6..9cc48782cc2d4 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -657,6 +657,7 @@ def _add_processed_request( lora_request: Optional[LoRARequest], prompt_adapter_request: Optional[PromptAdapterRequest], trace_headers: Optional[Mapping[str, str]] = None, + priority: Optional[int] = None, ) -> None: self._validate_model_inputs(processed_inputs) # Create the sequences. @@ -687,7 +688,8 @@ def _add_processed_request( lora_request=lora_request, trace_headers=trace_headers, prompt_adapter_request=prompt_adapter_request, - encoder_seq=encoder_seq) + encoder_seq=encoder_seq, + priority=priority) elif isinstance(params, PoolingParams): seq_group = self._create_sequence_group_with_pooling( request_id, @@ -696,7 +698,8 @@ def _add_processed_request( arrival_time=arrival_time, lora_request=lora_request, prompt_adapter_request=prompt_adapter_request, - encoder_seq=encoder_seq) + encoder_seq=encoder_seq, + priority=priority) else: raise ValueError( "Either SamplingParams or PoolingParams must be provided.") @@ -1049,6 +1052,7 @@ def add_request( lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, + priority: Optional[int] = None, ) -> None: """Add a request to the engine's request pool. @@ -1113,6 +1117,7 @@ def add_request( lora_request=lora_request, prompt_adapter_request=prompt_adapter_request, trace_headers=trace_headers, + priority=priority, ) def _create_sequence_group_with_sampling( @@ -1125,6 +1130,7 @@ def _create_sequence_group_with_sampling( trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, encoder_seq: Optional[Sequence] = None, + priority: Optional[int] = None, ) -> SequenceGroup: """Creates a SequenceGroup with SamplingParams.""" max_logprobs = self.get_model_config().max_logprobs @@ -1151,7 +1157,8 @@ def _create_sequence_group_with_sampling( lora_request=lora_request, trace_headers=trace_headers, prompt_adapter_request=prompt_adapter_request, - encoder_seq=encoder_seq) + encoder_seq=encoder_seq, + priority=priority) return seq_group @@ -1164,6 +1171,7 @@ def _create_sequence_group_with_pooling( lora_request: Optional[LoRARequest], prompt_adapter_request: Optional[PromptAdapterRequest], encoder_seq: Optional[Sequence] = None, + priority: Optional[int] = None, ) -> SequenceGroup: """Creates a SequenceGroup with PoolingParams.""" # Defensive copy of PoolingParams, which are used by the pooler @@ -1176,7 +1184,8 @@ def _create_sequence_group_with_pooling( lora_request=lora_request, pooling_params=pooling_params, prompt_adapter_request=prompt_adapter_request, - encoder_seq=encoder_seq) + encoder_seq=encoder_seq, + priority=priority) return seq_group def abort_request(self, request_id: Union[str, Iterable[str]]) -> None: diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index 0edd4bfaecd6a..f1baffa067ef4 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -283,8 +283,10 @@ def generate( lora_request: Optional[Union[List[LoRARequest], LoRARequest]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, guided_options_request: Optional[Union[LLMGuidedOptions, - GuidedDecodingRequest]] = None + GuidedDecodingRequest]] = None, + priority: Optional[List[int]] = None, ) -> List[RequestOutput]: + print("Priority is:", priority) """Generates the completions for the input prompts. This class automatically batches the given prompts, considering @@ -342,7 +344,8 @@ def generate( params=sampling_params, lora_request=lora_request, prompt_adapter_request=prompt_adapter_request, - guided_options=guided_options_request) + guided_options=guided_options_request, + priority=priority) outputs = self._run_engine(use_tqdm=use_tqdm) return LLMEngine.validate_outputs(outputs, RequestOutput) @@ -605,6 +608,7 @@ def _validate_and_add_requests( lora_request: Optional[Union[Sequence[LoRARequest], LoRARequest]], prompt_adapter_request: Optional[PromptAdapterRequest], guided_options: Optional[GuidedDecodingRequest] = None, + priority: Optional[List[int]] = None, ) -> None: if isinstance(inputs, (str, dict)): # Convert a single prompt to a list. @@ -636,6 +640,7 @@ def _validate_and_add_requests( lora_request=lora_request[i] if isinstance( lora_request, Sequence) else lora_request, prompt_adapter_request=prompt_adapter_request, + priority=priority[i] if priority else None, ) def _add_request( @@ -644,6 +649,7 @@ def _add_request( params: Union[SamplingParams, PoolingParams], lora_request: Optional[LoRARequest] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, + priority: Optional[int] = None, ) -> None: request_id = str(next(self.request_counter)) self.llm_engine.add_request( @@ -652,6 +658,7 @@ def _add_request( params, lora_request=lora_request, prompt_adapter_request=prompt_adapter_request, + priority=priority, ) def _add_guided_processor( diff --git a/vllm/sequence.py b/vllm/sequence.py index e7cde87f605a7..d815cfcb34ef0 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -581,6 +581,7 @@ class SequenceGroup: unless you are working with an encoder/decoder model. trace_headers: OpenTelemetry trace headers. prompt_adapter_request: Prompt Adapter request. + priority: User-defined priority of the request. """ def __init__( @@ -595,9 +596,11 @@ def __init__( encoder_seq: Optional[Sequence] = None, trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, + priority: Optional[int] = None, ) -> None: self.request_id = request_id self.seqs = seqs + self.arrival_time = arrival_time self.is_single_seq = len(seqs) == 1 self.seqs_dict = {seq.seq_id: seq for seq in seqs} @@ -615,6 +618,7 @@ def __init__( self.prompt_adapter_request = prompt_adapter_request self.encoder_seq = encoder_seq self.trace_headers = trace_headers + self.priority = priority @property def prompt(self) -> Optional[str]: From e488a5b76c8c32df027d178ae3d184bd7d9a82e1 Mon Sep 17 00:00:00 2001 From: apatke Date: Thu, 5 Sep 2024 20:20:03 +0000 Subject: [PATCH 02/12] add priority benchmark --- benchmarks/benchmark_prioritization.py | 306 +++++++++++++++++++++++++ 1 file changed, 306 insertions(+) create mode 100644 benchmarks/benchmark_prioritization.py diff --git a/benchmarks/benchmark_prioritization.py b/benchmarks/benchmark_prioritization.py new file mode 100644 index 0000000000000..6ba7e8b5655d8 --- /dev/null +++ b/benchmarks/benchmark_prioritization.py @@ -0,0 +1,306 @@ +"""Benchmark offline prioritization.""" +import argparse +import json +import random +import time +from typing import List, Optional, Tuple + +from transformers import AutoTokenizer, PreTrainedTokenizerBase + +from vllm.model_executor.layers.quantization import QUANTIZATION_METHODS + + +def sample_requests( + dataset_path: str, + num_requests: int, + tokenizer: PreTrainedTokenizerBase, + fixed_output_len: Optional[int], +) -> List[Tuple[str, int, int]]: + if fixed_output_len is not None and fixed_output_len < 4: + raise ValueError("output_len too small") + + # Load the dataset. + with open(dataset_path) as f: + dataset = json.load(f) + # Filter out the conversations with less than 2 turns. + dataset = [data for data in dataset if len(data["conversations"]) >= 2] + # Only keep the first two turns of each conversation. + dataset = [(data["conversations"][0]["value"], + data["conversations"][1]["value"]) for data in dataset] + + # Shuffle the dataset. + random.shuffle(dataset) + + # Filter out sequences that are too long or too short + filtered_dataset: List[Tuple[str, int, int]] = [] + for i in range(len(dataset)): + if len(filtered_dataset) == num_requests: + break + + # Tokenize the prompts and completions. + prompt = dataset[i][0] + prompt_token_ids = tokenizer(prompt).input_ids + completion = dataset[i][1] + completion_token_ids = tokenizer(completion).input_ids + prompt_len = len(prompt_token_ids) + output_len = len(completion_token_ids + ) if fixed_output_len is None else fixed_output_len + if prompt_len < 4 or output_len < 4: + # Prune too short sequences. + continue + if prompt_len > 1024 or prompt_len + output_len > 2048: + # Prune too long sequences. + continue + + #Select a equi-probable random priority + priority = 0 if random.random() < 0.5 else 1 + + filtered_dataset.append((prompt, prompt_len, output_len, priority)) + + return filtered_dataset + + +def run_vllm( + requests: List[Tuple[str, int, int]], + model: str, + tokenizer: str, + quantization: Optional[str], + tensor_parallel_size: int, + seed: int, + n: int, + use_beam_search: bool, + trust_remote_code: bool, + dtype: str, + max_model_len: Optional[int], + enforce_eager: bool, + kv_cache_dtype: str, + quantization_param_path: Optional[str], + device: str, + enable_prefix_caching: bool, + enable_chunked_prefill: bool, + max_num_batched_tokens: int, + gpu_memory_utilization: float = 0.9, + download_dir: Optional[str] = None, +) -> float: + from vllm import LLM, SamplingParams + llm = LLM( + model=model, + tokenizer=tokenizer, + quantization=quantization, + tensor_parallel_size=tensor_parallel_size, + seed=seed, + trust_remote_code=trust_remote_code, + dtype=dtype, + max_model_len=max_model_len, + gpu_memory_utilization=gpu_memory_utilization, + enforce_eager=enforce_eager, + kv_cache_dtype=kv_cache_dtype, + quantization_param_path=quantization_param_path, + device=device, + enable_prefix_caching=enable_prefix_caching, + download_dir=download_dir, + enable_chunked_prefill=enable_chunked_prefill, + max_num_batched_tokens=max_num_batched_tokens, + disable_log_stats=False, + ) + + # Add the requests to the engine. + prompts = [] + sampling_params = [] + priority = [] + for prompt, _, output_len, _priority in requests: + prompts.append(prompt) + priority.append(_priority) + sampling_params.append( + SamplingParams( + n=n, + temperature=0.0 if use_beam_search else 1.0, + top_p=1.0, + use_beam_search=use_beam_search, + ignore_eos=True, + max_tokens=output_len, + )) + + start = time.perf_counter() + llm.generate(prompts, + sampling_params, + priority=priority, + use_tqdm=True) + end = time.perf_counter() + return end - start + + +def main(args: argparse.Namespace): + print(args) + random.seed(args.seed) + + # Sample the requests. + tokenizer = AutoTokenizer.from_pretrained( + args.tokenizer, trust_remote_code=args.trust_remote_code) + if args.dataset is None: + # Synthesize a prompt with the given input length. + prompt = "hi" * (args.input_len - 1) + requests = [(prompt, args.input_len, args.output_len) + for _ in range(args.num_prompts)] + else: + requests = sample_requests(args.dataset, args.num_prompts, tokenizer, + args.output_len) + + if args.backend == "vllm": + elapsed_time = run_vllm( + requests, args.model, args.tokenizer, args.quantization, + args.tensor_parallel_size, args.seed, args.n, args.use_beam_search, + args.trust_remote_code, args.dtype, args.max_model_len, + args.enforce_eager, args.kv_cache_dtype, + args.quantization_param_path, args.device, + args.enable_prefix_caching, args.enable_chunked_prefill, + args.max_num_batched_tokens, args.gpu_memory_utilization, + args.download_dir) + else: + raise ValueError(f"Unknown backend: {args.backend}") + total_num_tokens = sum(prompt_len + output_len + for _, prompt_len, output_len, priority in requests) + print(f"Throughput: {len(requests) / elapsed_time:.2f} requests/s, " + f"{total_num_tokens / elapsed_time:.2f} tokens/s") + + # Output JSON results if specified + if args.output_json: + results = { + "elapsed_time": elapsed_time, + "num_requests": len(requests), + "total_num_tokens": total_num_tokens, + "requests_per_second": len(requests) / elapsed_time, + "tokens_per_second": total_num_tokens / elapsed_time, + } + with open(args.output_json, "w") as f: + json.dump(results, f, indent=4) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Benchmark the throughput.") + parser.add_argument("--backend", + type=str, + choices=["vllm", "hf", "mii"], + default="vllm") + parser.add_argument("--dataset", + type=str, + default=None, + help="Path to the dataset.") + parser.add_argument("--input-len", + type=int, + default=None, + help="Input prompt length for each request") + parser.add_argument("--output-len", + type=int, + default=None, + help="Output length for each request. Overrides the " + "output length from the dataset.") + parser.add_argument("--model", + type=str, + default="facebook/opt-125m") + parser.add_argument("--tokenizer", type=str, default=None) + parser.add_argument('--quantization', + '-q', + choices=[*QUANTIZATION_METHODS, None], + default=None) + parser.add_argument("--tensor-parallel-size", "-tp", type=int, default=1) + parser.add_argument("--n", + type=int, + default=1, + help="Number of generated sequences per prompt.") + parser.add_argument("--use-beam-search", action="store_true") + parser.add_argument("--num-prompts", + type=int, + default=200, + help="Number of prompts to process.") + parser.add_argument("--seed", type=int, default=0) + parser.add_argument('--trust-remote-code', + action='store_true', + help='trust remote code from huggingface') + parser.add_argument( + '--max-model-len', + type=int, + default=None, + help='Maximum length of a sequence (including prompt and output). ' + 'If None, will be derived from the model.') + parser.add_argument( + '--dtype', + type=str, + default='auto', + choices=['auto', 'half', 'float16', 'bfloat16', 'float', 'float32'], + help='data type for model weights and activations. ' + 'The "auto" option will use FP16 precision ' + 'for FP32 and FP16 models, and BF16 precision ' + 'for BF16 models.') + parser.add_argument('--gpu-memory-utilization', + type=float, + default=0.9, + help='the fraction of GPU memory to be used for ' + 'the model executor, which can range from 0 to 1.' + 'If unspecified, will use the default value of 0.9.') + parser.add_argument("--enforce-eager", + action="store_true", + help="enforce eager execution") + parser.add_argument( + '--kv-cache-dtype', + type=str, + choices=['auto', 'fp8', 'fp8_e5m2', 'fp8_e4m3'], + default="auto", + help='Data type for kv cache storage. If "auto", will use model ' + 'data type. CUDA 11.8+ supports fp8 (=fp8_e4m3) and fp8_e5m2. ' + 'ROCm (AMD GPU) supports fp8 (=fp8_e4m3)') + parser.add_argument( + '--quantization-param-path', + type=str, + default=None, + help='Path to the JSON file containing the KV cache scaling factors. ' + 'This should generally be supplied, when KV cache dtype is FP8. ' + 'Otherwise, KV cache scaling factors default to 1.0, which may cause ' + 'accuracy issues. FP8_E5M2 (without scaling) is only supported on ' + 'cuda version greater than 11.8. On ROCm (AMD GPU), FP8_E4M3 is ' + 'instead supported for common inference criteria.') + parser.add_argument( + "--device", + type=str, + default="cuda", + choices=["cuda", "cpu"], + help='device type for vLLM execution, supporting CUDA and CPU.') + parser.add_argument( + "--enable-prefix-caching", + action='store_true', + help="enable automatic prefix caching for vLLM backend.") + parser.add_argument("--enable-chunked-prefill", + action='store_true', + help="enable chunked prefill for vLLM backend.") + parser.add_argument('--max-num-batched-tokens', + type=int, + default=None, + help='maximum number of batched tokens per ' + 'iteration') + parser.add_argument('--download-dir', + type=str, + default=None, + help='directory to download and load the weights, ' + 'default to the default cache dir of huggingface') + parser.add_argument( + '--output-json', + type=str, + default=None, + help='Path to save the throughput results in JSON format.') + + parser.add_argument( + '--scheduling-policy', + type=bool, + default='sp', + help='sp: Strict Priority, fcfs: First Come First Serve') + + args = parser.parse_args() + if args.tokenizer is None: + args.tokenizer = args.model + if args.dataset is None: + assert args.input_len is not None + assert args.output_len is not None + else: + assert args.input_len is None + + main(args) From 520d07feaeb85accf28bf0eff128605624f9745d Mon Sep 17 00:00:00 2001 From: apatke Date: Thu, 5 Sep 2024 20:50:10 +0000 Subject: [PATCH 03/12] formatting fixes --- vllm/core/scheduler.py | 55 ++++++++++++++++++------------------------ 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index bdb6862b7c6a4..df191345539b3 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -763,24 +763,26 @@ def _get_prompt_limit(self, seq_group: SequenceGroup) -> int: else: return prompt_limit - - def _get_priority(self, seq_group: SequenceGroup) -> float: - """ Get the priority of the sequence group. Highest preference to user-defined priority, followed by arrival time. + def _get_priority(self, + seq_group: SequenceGroup) -> Tuple[Optional[int], float]: + """ Get the priority of the sequence group. + Highest preference to user-defined priority, followed by arrival time. Args: seq_group: The sequence group input. Returns: The priority of the sequence group. """ - return (seq_group.priority,seq_group.arrival_time) + return (seq_group.priority, seq_group.arrival_time) def _schedule_priority_preemption( self, budget: SchedulingBudget, - ) -> Tuple[deque, deque, int]: - """Force preempt requests from the running queue - if their priority is lower. + ) -> int: + """Sorts waiting and running queue. Also, force preempt requests + from the running queue if their priority is lower. + Priority-based preemption is used with the strict priority + (sp) policy. Args: - policy: Scheduling policy for sorting waiting and running queues. budget: The scheduling budget. The argument is in-place updated when any requests are scheduled. Returns: @@ -790,34 +792,27 @@ def _schedule_priority_preemption( running_queue = self.running waiting_queue = self.waiting - running_queue = deque(sorted(running_queue, key=lambda x: self._get_priority(x))) - waiting_queue = deque(sorted(waiting_queue, key=lambda x: self._get_priority(x))) + running_queue = deque( + sorted(running_queue, key=lambda x: self._get_priority(x))) + waiting_queue = deque( + sorted(waiting_queue, key=lambda x: self._get_priority(x))) blocks_to_swap_out: List[Tuple[int, int]] = [] force_preemption_cnt = 0 - - #print("Initial waiting queue is ", [x.request_id for x in waiting_queue]) - while waiting_queue: - print("waiting queue priority is ", [x.priority for x in waiting_queue]) - print("front of waiting queue is ", waiting_queue[0].request_id) + if waiting_queue: seq_group = waiting_queue[0] waiting_queue.popleft() num_new_seqs = seq_group.get_max_num_running_seqs() num_new_tokens = self._get_num_new_tokens(seq_group, SequenceStatus.WAITING, False, budget) - - now = time.time() + # Only preempt if priority inversion exists - while running_queue and self._get_priority(running_queue[-1]) > self._get_priority(seq_group): - print("priority inversion exists") - import pdb; pdb.set_trace() + while running_queue and self._get_priority( + running_queue[-1]) > self._get_priority(seq_group): #Only preempt if waiting sequence cannot be allocated - print("Number of new tokens is ", num_new_tokens) - print("Number of new seqs is ", num_new_seqs) can_allocate = self.block_manager.can_allocate(seq_group) - print("Allocation status is ", can_allocate) if (num_new_tokens > 0 and budget.can_schedule(num_new_tokens=num_new_tokens, num_new_seqs=num_new_seqs) @@ -825,7 +820,6 @@ def _schedule_priority_preemption( break #Adjust budget to remove the victim sequence group - print("victim sequence group is ", running_queue[-1].request_id) vseq_group = running_queue.pop() num_running_tokens = self._get_num_new_tokens( vseq_group, SequenceStatus.RUNNING, False, budget) @@ -842,15 +836,11 @@ def _schedule_priority_preemption( force_preemption_cnt += 1 #Put the sequence back into the waiting queue waiting_queue.appendleft(seq_group) - - - waiting_queue = deque(sorted(waiting_queue, key=lambda x: self._get_priority(x))) self.waiting = waiting_queue self.running = running_queue - #print("Final waiting queue is ", [x.request_id for x in self.waiting]) return force_preemption_cnt - + def _schedule_prefills( self, budget: SchedulingBudget, @@ -1001,9 +991,10 @@ def _schedule_default(self) -> SchedulerOutputs: prefills = self._schedule_prefills(budget, curr_loras, enable_chunking=False) - - if len(prefills.seq_groups)==0 and self.scheduler_config.policy == "sp": - forced_preemptions = self._schedule_priority_preemption(budget) + + if len(prefills.seq_groups + ) == 0 and self.scheduler_config.policy == "sp": + self._schedule_priority_preemption(budget) # Don't schedule decodes if prefills are scheduled. # NOTE: If `_schedule_prefills` doesn't enable chunking, self.running From 783eb2f127558c22fa1c4867ba760646a18fd51e Mon Sep 17 00:00:00 2001 From: apatke Date: Fri, 6 Sep 2024 17:54:45 +0000 Subject: [PATCH 04/12] changed sp to priority --- benchmarks/benchmark_prioritization.py | 15 ++------------- vllm/config.py | 2 +- vllm/core/scheduler.py | 10 +++++----- vllm/engine/arg_utils.py | 1 + vllm/entrypoints/llm.py | 1 - 5 files changed, 9 insertions(+), 20 deletions(-) diff --git a/benchmarks/benchmark_prioritization.py b/benchmarks/benchmark_prioritization.py index 6ba7e8b5655d8..0ba29fabca59b 100644 --- a/benchmarks/benchmark_prioritization.py +++ b/benchmarks/benchmark_prioritization.py @@ -122,10 +122,7 @@ def run_vllm( )) start = time.perf_counter() - llm.generate(prompts, - sampling_params, - priority=priority, - use_tqdm=True) + llm.generate(prompts, sampling_params, priority=priority, use_tqdm=True) end = time.perf_counter() return end - start @@ -195,9 +192,7 @@ def main(args: argparse.Namespace): default=None, help="Output length for each request. Overrides the " "output length from the dataset.") - parser.add_argument("--model", - type=str, - default="facebook/opt-125m") + parser.add_argument("--model", type=str, default="facebook/opt-125m") parser.add_argument("--tokenizer", type=str, default=None) parser.add_argument('--quantization', '-q', @@ -288,12 +283,6 @@ def main(args: argparse.Namespace): default=None, help='Path to save the throughput results in JSON format.') - parser.add_argument( - '--scheduling-policy', - type=bool, - default='sp', - help='sp: Strict Priority, fcfs: First Come First Serve') - args = parser.parse_args() if args.tokenizer is None: args.tokenizer = args.model diff --git a/vllm/config.py b/vllm/config.py index 4656dde454e4b..58ab0d6f16ad0 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -972,7 +972,7 @@ def __init__(self, preemption_mode: Optional[str] = None, num_scheduler_steps: int = 1, send_delta_data: bool = False, - policy: str = "sp") -> None: + policy: str = "priority") -> None: if max_num_batched_tokens is None: if enable_chunked_prefill: # It is the values that have the best balance between ITL diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index df191345539b3..433f8def4e8c6 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -780,8 +780,7 @@ def _schedule_priority_preemption( ) -> int: """Sorts waiting and running queue. Also, force preempt requests from the running queue if their priority is lower. - Priority-based preemption is used with the strict priority - (sp) policy. + Priority-based preemption is used with the priority policy. Args: budget: The scheduling budget. The argument is in-place updated when any requests are scheduled. @@ -794,8 +793,6 @@ def _schedule_priority_preemption( running_queue = deque( sorted(running_queue, key=lambda x: self._get_priority(x))) - waiting_queue = deque( - sorted(waiting_queue, key=lambda x: self._get_priority(x))) blocks_to_swap_out: List[Tuple[int, int]] = [] force_preemption_cnt = 0 @@ -837,6 +834,9 @@ def _schedule_priority_preemption( #Put the sequence back into the waiting queue waiting_queue.appendleft(seq_group) + waiting_queue = deque( + sorted(waiting_queue, key=lambda x: self._get_priority(x))) + self.waiting = waiting_queue self.running = running_queue return force_preemption_cnt @@ -993,7 +993,7 @@ def _schedule_default(self) -> SchedulerOutputs: enable_chunking=False) if len(prefills.seq_groups - ) == 0 and self.scheduler_config.policy == "sp": + ) == 0 and self.scheduler_config.policy == "priority": self._schedule_priority_preemption(budget) # Don't schedule decodes if prefills are scheduled. diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index f0b866db64324..82f653a5dcc62 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -562,6 +562,7 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: default=EngineArgs.scheduler_delay_factor, help='Apply a delay (of delay factor multiplied by previous' 'prompt latency) before scheduling next prompt.') + parser.add_argument( '--enable-chunked-prefill', action=StoreBoolean, diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index 8e9d178ca2286..8d4ecdbb92393 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -286,7 +286,6 @@ def generate( GuidedDecodingRequest]] = None, priority: Optional[List[int]] = None, ) -> List[RequestOutput]: - print("Priority is:", priority) """Generates the completions for the input prompts. This class automatically batches the given prompts, considering From 5118c60c39177c15e72ccb27342c38db34b94b51 Mon Sep 17 00:00:00 2001 From: apatke Date: Fri, 6 Sep 2024 17:57:08 +0000 Subject: [PATCH 05/12] remove whitespace --- vllm/engine/arg_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 82f653a5dcc62..f0b866db64324 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -562,7 +562,6 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: default=EngineArgs.scheduler_delay_factor, help='Apply a delay (of delay factor multiplied by previous' 'prompt latency) before scheduling next prompt.') - parser.add_argument( '--enable-chunked-prefill', action=StoreBoolean, From b8350ecb9f9dd63c67e48a05b82e11203878f0d6 Mon Sep 17 00:00:00 2001 From: apatke Date: Fri, 13 Sep 2024 15:04:58 +0000 Subject: [PATCH 06/12] code readability --- vllm/config.py | 2 +- vllm/core/scheduler.py | 20 +++++++++----------- vllm/engine/llm_engine.py | 2 ++ vllm/entrypoints/llm.py | 2 ++ vllm/sequence.py | 2 +- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/vllm/config.py b/vllm/config.py index 58ab0d6f16ad0..7272a3781ae56 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -972,7 +972,7 @@ def __init__(self, preemption_mode: Optional[str] = None, num_scheduler_steps: int = 1, send_delta_data: bool = False, - policy: str = "priority") -> None: + policy: str = "fcfs") -> None: if max_num_batched_tokens is None: if enable_chunked_prefill: # It is the values that have the best balance between ITL diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 433f8def4e8c6..1d872341e2705 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -772,7 +772,7 @@ def _get_priority(self, Returns: The priority of the sequence group. """ - return (seq_group.priority, seq_group.arrival_time) + return seq_group.priority, seq_group.arrival_time def _schedule_priority_preemption( self, @@ -792,14 +792,13 @@ def _schedule_priority_preemption( waiting_queue = self.waiting running_queue = deque( - sorted(running_queue, key=lambda x: self._get_priority(x))) + sorted(running_queue, key=self._get_priority)) blocks_to_swap_out: List[Tuple[int, int]] = [] - force_preemption_cnt = 0 + force_preemption_count = 0 if waiting_queue: - seq_group = waiting_queue[0] - waiting_queue.popleft() + seq_group = waiting_queue.popleft() num_new_seqs = seq_group.get_max_num_running_seqs() num_new_tokens = self._get_num_new_tokens(seq_group, SequenceStatus.WAITING, @@ -808,9 +807,9 @@ def _schedule_priority_preemption( # Only preempt if priority inversion exists while running_queue and self._get_priority( running_queue[-1]) > self._get_priority(seq_group): - #Only preempt if waiting sequence cannot be allocated + # Only preempt if waiting sequence cannot be allocated can_allocate = self.block_manager.can_allocate(seq_group) - if (num_new_tokens > 0 + if (num_new_tokens and budget.can_schedule(num_new_tokens=num_new_tokens, num_new_seqs=num_new_seqs) and can_allocate == AllocStatus.OK): @@ -835,11 +834,11 @@ def _schedule_priority_preemption( waiting_queue.appendleft(seq_group) waiting_queue = deque( - sorted(waiting_queue, key=lambda x: self._get_priority(x))) + sorted(waiting_queue, key=self._get_priority)) self.waiting = waiting_queue self.running = running_queue - return force_preemption_cnt + return force_preemption_count def _schedule_prefills( self, @@ -992,8 +991,7 @@ def _schedule_default(self) -> SchedulerOutputs: curr_loras, enable_chunking=False) - if len(prefills.seq_groups - ) == 0 and self.scheduler_config.policy == "priority": + if not prefills.seq_groups and self.scheduler_config.policy == "priority": self._schedule_priority_preemption(budget) # Don't schedule decodes if prefills are scheduled. diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 4f09aa157b49d..f51d72f1c1fd7 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -1069,6 +1069,8 @@ def add_request( arrival_time: The arrival time of the request. If None, we use the current monotonic time. trace_headers: OpenTelemetry trace headers. + priority: The priority of the request. + Only appicable with priority scheduling. Details: - Set arrival_time to the current time if it is None. diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index 8d4ecdbb92393..9e4a6f8fbcb69 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -303,6 +303,8 @@ def generate( lora_request: LoRA request to use for generation, if any. prompt_adapter_request: Prompt Adapter request to use for generation, if any. + priority: The priority of the requests, if any. + Only applicable when priority scheduling policy is enabled. Returns: A list of ``RequestOutput`` objects containing the diff --git a/vllm/sequence.py b/vllm/sequence.py index 6493eb9682db2..d84d0960dc937 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -596,7 +596,7 @@ def __init__( encoder_seq: Optional[Sequence] = None, trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, - priority: Optional[int] = None, + priority: Optional[int] = 0, ) -> None: self.request_id = request_id self.seqs = seqs From 6ae42e94f2919f1817c03adf586d4af0dca9c011 Mon Sep 17 00:00:00 2001 From: apatke Date: Fri, 13 Sep 2024 15:11:02 +0000 Subject: [PATCH 07/12] Formatting --- vllm/core/scheduler.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 1d872341e2705..0ed7678bb5815 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -791,8 +791,7 @@ def _schedule_priority_preemption( running_queue = self.running waiting_queue = self.waiting - running_queue = deque( - sorted(running_queue, key=self._get_priority)) + running_queue = deque(sorted(running_queue, key=self._get_priority)) blocks_to_swap_out: List[Tuple[int, int]] = [] force_preemption_count = 0 @@ -829,12 +828,11 @@ def _schedule_priority_preemption( self._preempt(vseq_group, blocks_to_swap_out, PreemptionMode.RECOMPUTE) waiting_queue.appendleft(vseq_group) - force_preemption_cnt += 1 + force_preemption_count += 1 #Put the sequence back into the waiting queue waiting_queue.appendleft(seq_group) - waiting_queue = deque( - sorted(waiting_queue, key=self._get_priority)) + waiting_queue = deque(sorted(waiting_queue, key=self._get_priority)) self.waiting = waiting_queue self.running = running_queue @@ -991,7 +989,8 @@ def _schedule_default(self) -> SchedulerOutputs: curr_loras, enable_chunking=False) - if not prefills.seq_groups and self.scheduler_config.policy == "priority": + if len(prefills.seq_groups + ) == 0 and self.scheduler_config.policy == "priority": self._schedule_priority_preemption(budget) # Don't schedule decodes if prefills are scheduled. From 22a00a6dc7de78b1a59e3396e0a359198d69ed3f Mon Sep 17 00:00:00 2001 From: apatke Date: Fri, 13 Sep 2024 15:24:50 +0000 Subject: [PATCH 08/12] Formatting --- vllm/core/scheduler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 0ed7678bb5815..b77f3e8418a87 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -808,10 +808,9 @@ def _schedule_priority_preemption( running_queue[-1]) > self._get_priority(seq_group): # Only preempt if waiting sequence cannot be allocated can_allocate = self.block_manager.can_allocate(seq_group) - if (num_new_tokens + if (num_new_tokens and can_allocate == AllocStatus.OK and budget.can_schedule(num_new_tokens=num_new_tokens, - num_new_seqs=num_new_seqs) - and can_allocate == AllocStatus.OK): + num_new_seqs=num_new_seqs)): break #Adjust budget to remove the victim sequence group From 2cb1c620a6439b4ddb10b252c47f3a10e0d7283c Mon Sep 17 00:00:00 2001 From: apatke Date: Fri, 13 Sep 2024 15:47:20 +0000 Subject: [PATCH 09/12] priority check --- vllm/engine/llm_engine.py | 6 ++++++ vllm/sequence.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index f51d72f1c1fd7..5bad51986eb7c 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -1099,6 +1099,12 @@ def add_request( if lora_request is not None and not self.lora_config: raise ValueError(f"Got lora_request {lora_request} but LoRA is " "not enabled!") + + if priority and not self.scheduler_config.scheduling_policy == "priority": + raise ValueError( + f"Got non-zero priority {priority} but Priority scheduling is not enabled." + ) + if arrival_time is None: arrival_time = time.time() diff --git a/vllm/sequence.py b/vllm/sequence.py index d84d0960dc937..6493eb9682db2 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -596,7 +596,7 @@ def __init__( encoder_seq: Optional[Sequence] = None, trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, - priority: Optional[int] = 0, + priority: Optional[int] = None, ) -> None: self.request_id = request_id self.seqs = seqs From 263e8f3ed7965082c3f464134674532224d681bb Mon Sep 17 00:00:00 2001 From: apatke Date: Fri, 13 Sep 2024 15:52:25 +0000 Subject: [PATCH 10/12] formatting --- vllm/engine/llm_engine.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 5bad51986eb7c..187d058a5eba6 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -1070,7 +1070,7 @@ def add_request( the current monotonic time. trace_headers: OpenTelemetry trace headers. priority: The priority of the request. - Only appicable with priority scheduling. + Only applicable with priority scheduling. Details: - Set arrival_time to the current time if it is None. @@ -1100,10 +1100,9 @@ def add_request( raise ValueError(f"Got lora_request {lora_request} but LoRA is " "not enabled!") - if priority and not self.scheduler_config.scheduling_policy == "priority": - raise ValueError( - f"Got non-zero priority {priority} but Priority scheduling is not enabled." - ) + if priority and not self.scheduler_config.policy == "priority": + raise ValueError(f"Got priority {priority} but " + "Priority scheduling is not enabled.") if arrival_time is None: arrival_time = time.time() From 343c062db67d8bd79bac8dd4713e5eca9e84abda Mon Sep 17 00:00:00 2001 From: apatke Date: Wed, 18 Sep 2024 19:01:58 +0000 Subject: [PATCH 11/12] change default priority to 0 --- vllm/core/scheduler.py | 4 ++-- vllm/engine/llm_engine.py | 10 +++++----- vllm/entrypoints/llm.py | 4 ++-- vllm/sequence.py | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index b77f3e8418a87..b48f941190a08 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -803,10 +803,10 @@ def _schedule_priority_preemption( SequenceStatus.WAITING, False, budget) - # Only preempt if priority inversion exists + #Only preempt if priority inversion exists while running_queue and self._get_priority( running_queue[-1]) > self._get_priority(seq_group): - # Only preempt if waiting sequence cannot be allocated + #Only preempt if waiting sequence cannot be allocated can_allocate = self.block_manager.can_allocate(seq_group) if (num_new_tokens and can_allocate == AllocStatus.OK and budget.can_schedule(num_new_tokens=num_new_tokens, diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 187d058a5eba6..381d0d7d0adfc 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -655,7 +655,7 @@ def _add_processed_request( lora_request: Optional[LoRARequest], prompt_adapter_request: Optional[PromptAdapterRequest], trace_headers: Optional[Mapping[str, str]] = None, - priority: Optional[int] = None, + priority: int = 0, ) -> None: self._validate_model_inputs(processed_inputs) # Create the sequences. @@ -1050,7 +1050,7 @@ def add_request( lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, - priority: Optional[int] = None, + priority: int = 0, ) -> None: """Add a request to the engine's request pool. @@ -1100,7 +1100,7 @@ def add_request( raise ValueError(f"Got lora_request {lora_request} but LoRA is " "not enabled!") - if priority and not self.scheduler_config.policy == "priority": + if priority > 0 and not self.scheduler_config.policy == "priority": raise ValueError(f"Got priority {priority} but " "Priority scheduling is not enabled.") @@ -1135,7 +1135,7 @@ def _create_sequence_group_with_sampling( trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, encoder_seq: Optional[Sequence] = None, - priority: Optional[int] = None, + priority: int = 0, ) -> SequenceGroup: """Creates a SequenceGroup with SamplingParams.""" max_logprobs = self.get_model_config().max_logprobs @@ -1176,7 +1176,7 @@ def _create_sequence_group_with_pooling( lora_request: Optional[LoRARequest], prompt_adapter_request: Optional[PromptAdapterRequest], encoder_seq: Optional[Sequence] = None, - priority: Optional[int] = None, + priority: int = 0, ) -> SequenceGroup: """Creates a SequenceGroup with PoolingParams.""" # Defensive copy of PoolingParams, which are used by the pooler diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index 9e4a6f8fbcb69..da43447fc2987 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -648,7 +648,7 @@ def _validate_and_add_requests( lora_request=lora_request[i] if isinstance( lora_request, Sequence) else lora_request, prompt_adapter_request=prompt_adapter_request, - priority=priority[i] if priority else None, + priority=priority[i] if priority else 0, ) def _add_request( @@ -657,7 +657,7 @@ def _add_request( params: Union[SamplingParams, PoolingParams], lora_request: Optional[LoRARequest] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, - priority: Optional[int] = None, + priority: int = 0, ) -> None: request_id = str(next(self.request_counter)) self.llm_engine.add_request( diff --git a/vllm/sequence.py b/vllm/sequence.py index 6493eb9682db2..d066e23e55b54 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -596,7 +596,7 @@ def __init__( encoder_seq: Optional[Sequence] = None, trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, - priority: Optional[int] = None, + priority: int = 0, ) -> None: self.request_id = request_id self.seqs = seqs From 4e55c6dd2ee637cb304592f3e8c26785fb5dd3d0 Mon Sep 17 00:00:00 2001 From: apatke Date: Wed, 18 Sep 2024 19:05:55 +0000 Subject: [PATCH 12/12] minor optimization --- vllm/core/scheduler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index b48f941190a08..def2caac17f63 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -788,10 +788,9 @@ def _schedule_priority_preemption( A count of priority-based preemptions. """ - running_queue = self.running waiting_queue = self.waiting - running_queue = deque(sorted(running_queue, key=self._get_priority)) + running_queue = deque(sorted(self.running, key=self._get_priority)) blocks_to_swap_out: List[Tuple[int, int]] = [] force_preemption_count = 0