From 10de6fb0ab267e90433aeb69b6bb6bc14ef65bb1 Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Wed, 14 Feb 2024 12:52:50 -0800 Subject: [PATCH 1/5] Add metrics to RequestOutput --- tests/async_engine/test_request_tracker.py | 3 ++- vllm/core/scheduler.py | 5 ++++ vllm/outputs.py | 29 +++++++++++++++++++++- vllm/sequence.py | 10 ++++++-- 4 files changed, 43 insertions(+), 4 deletions(-) diff --git a/tests/async_engine/test_request_tracker.py b/tests/async_engine/test_request_tracker.py index 3e4d53c5cbe23..6ae53b11f864e 100644 --- a/tests/async_engine/test_request_tracker.py +++ b/tests/async_engine/test_request_tracker.py @@ -64,7 +64,8 @@ def test_request_tracker(): stream_5 = tracker.add_request("5") assert tracker.new_requests_event.flag tracker.process_request_output( - RequestOutput("2", "output", [], [], [], finished=True)) + RequestOutput("2", "output", [], [], [], bool(finished), 0, 0, 0, 0, + 0)) new, finished = tracker.get_new_and_finished_requests() assert not tracker.new_requests_event.flag assert len(finished) == 1 diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 4fdf9ec341cfd..3b2e82d0b772d 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -365,10 +365,15 @@ def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]: # This function call changes the internal states of the scheduler # such as self.running, self.swapped, and self.waiting. scheduler_outputs = self._schedule() + now = time.time() # Create input data structures. seq_group_metadata_list: List[SequenceGroupMetadata] = [] for seq_group in scheduler_outputs.scheduled_seq_groups: + if seq_group.first_scheduled_time is None: + seq_group.first_scheduled_time = now + seq_group.time_in_queue = now - seq_group.arrival_time + seq_data: Dict[int, SequenceData] = {} block_tables: Dict[int, List[int]] = {} for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING): diff --git a/vllm/outputs.py b/vllm/outputs.py index 534e9d5ea8a53..7b27196d30e80 100644 --- a/vllm/outputs.py +++ b/vllm/outputs.py @@ -1,4 +1,5 @@ from typing import List, Optional +import time from vllm.sequence import (PromptLogprobs, SampleLogprobs, SequenceGroup, SequenceStatus) @@ -60,6 +61,11 @@ class RequestOutput: prompt_logprobs: The log probabilities to return per prompt token. outputs: The output sequences of the request. finished: Whether the whole request is finished. + arrival_time: The time when the request arrived. + first_scheduled_time: The time when the request was first scheduled. + first_token_time: The time when the first token was generated. + time_in_queue: The time the request spent in the queue. + finished_time: The time when the request was finished. lora_request: The LoRA request that was used to generate the output. """ @@ -71,6 +77,11 @@ def __init__( prompt_logprobs: Optional[PromptLogprobs], outputs: List[CompletionOutput], finished: bool, + arrival_time: float, + first_scheduled_time: float, + first_token_time: float, + time_in_queue: float, + finished_time: Optional[float] = None, lora_request: Optional[LoRARequest] = None, ) -> None: self.request_id = request_id @@ -79,6 +90,11 @@ def __init__( self.prompt_logprobs = prompt_logprobs self.outputs = outputs self.finished = finished + self.arrival_time = arrival_time + self.first_scheduled_time = first_scheduled_time + self.first_token_time = first_token_time + self.time_in_queue = time_in_queue + self.finished_time = finished_time self.lora_request = lora_request @classmethod @@ -115,12 +131,18 @@ def from_seq_group(cls, seq_group: SequenceGroup) -> "RequestOutput": prompt_token_ids = seq_group.prompt_token_ids prompt_logprobs = seq_group.prompt_logprobs finished = seq_group.is_finished() + finished_time = time.time() if finished else None return cls(seq_group.request_id, prompt, prompt_token_ids, prompt_logprobs, outputs, finished, + arrival_time=seq_group.arrival_time, + first_scheduled_time=seq_group.first_scheduled_time, + first_token_time=seq_group.first_token_time, + time_in_queue=seq_group.time_in_queue, + finished_time=finished_time, lora_request=seq_group.lora_request) def __repr__(self) -> str: @@ -130,4 +152,9 @@ def __repr__(self) -> str: f"prompt_logprobs={self.prompt_logprobs}, " f"outputs={self.outputs}, " f"finished={self.finished}, " - f"lora_request={self.lora_request})") + f"arrival_time={self.arrival_time}, " + f"first_scheduled_time={self.first_scheduled_time}, " + f"first_token_time={self.first_token_time}, " + f"time_in_queue={self.time_in_queue}, " + f"finished_time={self.finished_time}, " + f"lora_request={self.lora_request})") \ No newline at end of file diff --git a/vllm/sequence.py b/vllm/sequence.py index 9669562cfeac5..caae8c7ad0a40 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -252,8 +252,11 @@ def __init__( self.request_id = request_id self.seqs_dict = {seq.seq_id: seq for seq in seqs} self.sampling_params = sampling_params - self.arrival_time = arrival_time - self.last_token_time = arrival_time + self.arrival_time: float = arrival_time + self.last_token_time: float = arrival_time + self.first_scheduled_time: Optional[float] = None + self.first_token_time: Optional[float] = None + self.time_in_queue: Optional[float] = None self.lora_request = lora_request self.prefix: Optional[Prefix] = prefix self.prompt_logprobs: Optional[PromptLogprobs] = None @@ -276,6 +279,9 @@ def lora_int_id(self) -> int: def get_last_latency(self, now: float) -> float: """Gets last token latency for Request level timings.""" + if self.first_token_time is None: + self.first_token_time = now + latency = now - self.last_token_time self.last_token_time = now return latency From 539f844e006989b66ae9220cb120a7cea876a448 Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Tue, 20 Feb 2024 11:26:07 -0800 Subject: [PATCH 2/5] Apply feedback from code review --- tests/async_engine/test_request_tracker.py | 3 +- vllm/engine/llm_engine.py | 2 + vllm/outputs.py | 56 ++++++++++++---------- vllm/sequence.py | 8 ++-- 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/tests/async_engine/test_request_tracker.py b/tests/async_engine/test_request_tracker.py index 6ae53b11f864e..4043558bae919 100644 --- a/tests/async_engine/test_request_tracker.py +++ b/tests/async_engine/test_request_tracker.py @@ -64,8 +64,7 @@ def test_request_tracker(): stream_5 = tracker.add_request("5") assert tracker.new_requests_event.flag tracker.process_request_output( - RequestOutput("2", "output", [], [], [], bool(finished), 0, 0, 0, 0, - 0)) + RequestOutput("2", "output", [], [], [], bool(finished))) new, finished = tracker.get_new_and_finished_requests() assert not tracker.new_requests_event.flag assert len(finished) == 1 diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 86f0925209309..69e501c045919 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -725,6 +725,7 @@ def _process_sequence_group_outputs(self, seq_group: SequenceGroup, def _process_model_outputs( self, output: SamplerOutput, scheduler_outputs: SchedulerOutputs) -> List[RequestOutput]: + now = time.time() # Update the scheduled sequence groups with the model outputs. scheduled_seq_groups = scheduler_outputs.scheduled_seq_groups for seq_group, outputs in zip(scheduled_seq_groups, output): @@ -736,6 +737,7 @@ def _process_model_outputs( # Create the outputs. request_outputs: List[RequestOutput] = [] for seq_group in scheduled_seq_groups: + seq_group.set_first_token_time(now) request_output = RequestOutput.from_seq_group(seq_group) request_outputs.append(request_output) for seq_group in scheduler_outputs.ignored_seq_groups: diff --git a/vllm/outputs.py b/vllm/outputs.py index 7b27196d30e80..c013c7b6475b7 100644 --- a/vllm/outputs.py +++ b/vllm/outputs.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass from typing import List, Optional import time @@ -51,6 +52,24 @@ def __repr__(self) -> str: f"finish_reason={self.finish_reason})") +@dataclass +class RequestOutputMetrics: + """Metrics associated with a request. + + Args: + arrival_time: The time when the request arrived. + first_scheduled_time: The time when the request was first scheduled. + first_token_time: The time when the first token was generated. + time_in_queue: The time the request spent in the queue. + finished_time: The time when the request was finished. + """ + arrival_time: float + first_scheduled_time: float + first_token_time: float + time_in_queue: float + finished_time: Optional[float] = None + + class RequestOutput: """The output data of a request to the LLM. @@ -61,11 +80,7 @@ class RequestOutput: prompt_logprobs: The log probabilities to return per prompt token. outputs: The output sequences of the request. finished: Whether the whole request is finished. - arrival_time: The time when the request arrived. - first_scheduled_time: The time when the request was first scheduled. - first_token_time: The time when the first token was generated. - time_in_queue: The time the request spent in the queue. - finished_time: The time when the request was finished. + metrics: Metrics associated with the request. lora_request: The LoRA request that was used to generate the output. """ @@ -77,11 +92,7 @@ def __init__( prompt_logprobs: Optional[PromptLogprobs], outputs: List[CompletionOutput], finished: bool, - arrival_time: float, - first_scheduled_time: float, - first_token_time: float, - time_in_queue: float, - finished_time: Optional[float] = None, + metrics: Optional[RequestOutputMetrics] = None, lora_request: Optional[LoRARequest] = None, ) -> None: self.request_id = request_id @@ -90,11 +101,7 @@ def __init__( self.prompt_logprobs = prompt_logprobs self.outputs = outputs self.finished = finished - self.arrival_time = arrival_time - self.first_scheduled_time = first_scheduled_time - self.first_token_time = first_token_time - self.time_in_queue = time_in_queue - self.finished_time = finished_time + self.metrics = metrics self.lora_request = lora_request @classmethod @@ -138,11 +145,12 @@ def from_seq_group(cls, seq_group: SequenceGroup) -> "RequestOutput": prompt_logprobs, outputs, finished, - arrival_time=seq_group.arrival_time, - first_scheduled_time=seq_group.first_scheduled_time, - first_token_time=seq_group.first_token_time, - time_in_queue=seq_group.time_in_queue, - finished_time=finished_time, + metrics=RequestOutputMetrics( + arrival_time=seq_group.arrival_time, + first_scheduled_time=seq_group.first_scheduled_time, + first_token_time=seq_group.first_token_time, + time_in_queue=seq_group.time_in_queue, + finished_time=finished_time), lora_request=seq_group.lora_request) def __repr__(self) -> str: @@ -152,9 +160,5 @@ def __repr__(self) -> str: f"prompt_logprobs={self.prompt_logprobs}, " f"outputs={self.outputs}, " f"finished={self.finished}, " - f"arrival_time={self.arrival_time}, " - f"first_scheduled_time={self.first_scheduled_time}, " - f"first_token_time={self.first_token_time}, " - f"time_in_queue={self.time_in_queue}, " - f"finished_time={self.finished_time}, " - f"lora_request={self.lora_request})") \ No newline at end of file + f"metrics={self.metrics}, " + f"lora_request={self.lora_request})") diff --git a/vllm/sequence.py b/vllm/sequence.py index caae8c7ad0a40..c3a6f2f6c02bf 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -279,13 +279,15 @@ def lora_int_id(self) -> int: def get_last_latency(self, now: float) -> float: """Gets last token latency for Request level timings.""" - if self.first_token_time is None: - self.first_token_time = now - latency = now - self.last_token_time self.last_token_time = now return latency + def set_first_token_time(self, now: float) -> None: + """Sets the first token time for Request level timings.""" + if self.first_token_time is None: + self.first_token_time = now + def get_max_num_running_seqs(self) -> int: """The maximum number of sequences running in parallel in the remaining lifetime of the request.""" From f5029aaf51bf45e57c04672ec7e6802ca4a1f2e4 Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Tue, 20 Feb 2024 11:28:38 -0800 Subject: [PATCH 3/5] Tweak --- vllm/core/scheduler.py | 4 +--- vllm/engine/llm_engine.py | 2 +- vllm/sequence.py | 8 +++++++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 3b2e82d0b772d..5dde9097a3d57 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -370,9 +370,7 @@ def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]: # Create input data structures. seq_group_metadata_list: List[SequenceGroupMetadata] = [] for seq_group in scheduler_outputs.scheduled_seq_groups: - if seq_group.first_scheduled_time is None: - seq_group.first_scheduled_time = now - seq_group.time_in_queue = now - seq_group.arrival_time + seq_group.maybe_set_first_scheduled_time(now) seq_data: Dict[int, SequenceData] = {} block_tables: Dict[int, List[int]] = {} diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 69e501c045919..70b081fe53e7e 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -737,7 +737,7 @@ def _process_model_outputs( # Create the outputs. request_outputs: List[RequestOutput] = [] for seq_group in scheduled_seq_groups: - seq_group.set_first_token_time(now) + seq_group.maybe_set_first_token_time(now) request_output = RequestOutput.from_seq_group(seq_group) request_outputs.append(request_output) for seq_group in scheduler_outputs.ignored_seq_groups: diff --git a/vllm/sequence.py b/vllm/sequence.py index c3a6f2f6c02bf..cb2c047d671bc 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -283,11 +283,17 @@ def get_last_latency(self, now: float) -> float: self.last_token_time = now return latency - def set_first_token_time(self, now: float) -> None: + def maybe_set_first_token_time(self, now: float) -> None: """Sets the first token time for Request level timings.""" if self.first_token_time is None: self.first_token_time = now + def maybe_set_first_scheduled_time(self, now: float) -> None: + """Sets the first scheduled time and time in queue for Request level timings.""" + if self.first_scheduled_time is None: + self.first_scheduled_time = now + self.time_in_queue = now - self.arrival_time + def get_max_num_running_seqs(self) -> int: """The maximum number of sequences running in parallel in the remaining lifetime of the request.""" From 7bec4be860ab53d8996029d981432eb75901cdcc Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Tue, 20 Feb 2024 14:37:46 -0800 Subject: [PATCH 4/5] Reuse dataclass --- vllm/engine/llm_engine.py | 5 ++-- vllm/outputs.py | 31 +++-------------------- vllm/sequence.py | 52 ++++++++++++++++++++++++++++----------- 3 files changed, 45 insertions(+), 43 deletions(-) diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 70b081fe53e7e..d72707a357c1c 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -873,11 +873,12 @@ def _get_stats(self, # Latency Timings. time_last_iters = [] for seq_group in scheduler_outputs.scheduled_seq_groups: - # Time since last token. (n.b. updates seq_group.last_token_time) + # Time since last token. (n.b. updates seq_group.metrics.last_token_time) time_last_iters.append(seq_group.get_last_latency(now)) # Time since arrival for all finished requests. if seq_group.is_finished(): - time_e2e_requests.append(now - seq_group.arrival_time) + time_e2e_requests.append(now - + seq_group.metrics.arrival_time) time_to_first_tokens = time_last_iters if prompt_run else [] time_per_output_tokens = [] if prompt_run else time_last_iters diff --git a/vllm/outputs.py b/vllm/outputs.py index c013c7b6475b7..a6de2a5a2257b 100644 --- a/vllm/outputs.py +++ b/vllm/outputs.py @@ -1,9 +1,8 @@ -from dataclasses import dataclass from typing import List, Optional import time from vllm.sequence import (PromptLogprobs, SampleLogprobs, SequenceGroup, - SequenceStatus) + SequenceStatus, RequestMetrics) from vllm.lora.request import LoRARequest @@ -52,24 +51,6 @@ def __repr__(self) -> str: f"finish_reason={self.finish_reason})") -@dataclass -class RequestOutputMetrics: - """Metrics associated with a request. - - Args: - arrival_time: The time when the request arrived. - first_scheduled_time: The time when the request was first scheduled. - first_token_time: The time when the first token was generated. - time_in_queue: The time the request spent in the queue. - finished_time: The time when the request was finished. - """ - arrival_time: float - first_scheduled_time: float - first_token_time: float - time_in_queue: float - finished_time: Optional[float] = None - - class RequestOutput: """The output data of a request to the LLM. @@ -92,7 +73,7 @@ def __init__( prompt_logprobs: Optional[PromptLogprobs], outputs: List[CompletionOutput], finished: bool, - metrics: Optional[RequestOutputMetrics] = None, + metrics: Optional[RequestMetrics] = None, lora_request: Optional[LoRARequest] = None, ) -> None: self.request_id = request_id @@ -139,18 +120,14 @@ def from_seq_group(cls, seq_group: SequenceGroup) -> "RequestOutput": prompt_logprobs = seq_group.prompt_logprobs finished = seq_group.is_finished() finished_time = time.time() if finished else None + seq_group.set_finished_time(finished_time) return cls(seq_group.request_id, prompt, prompt_token_ids, prompt_logprobs, outputs, finished, - metrics=RequestOutputMetrics( - arrival_time=seq_group.arrival_time, - first_scheduled_time=seq_group.first_scheduled_time, - first_token_time=seq_group.first_token_time, - time_in_queue=seq_group.time_in_queue, - finished_time=finished_time), + seq_group.metrics, lora_request=seq_group.lora_request) def __repr__(self) -> str: diff --git a/vllm/sequence.py b/vllm/sequence.py index cb2c047d671bc..44adb058a5ba5 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -1,6 +1,7 @@ """Sequence and its related classes.""" import copy import enum +from dataclasses import dataclass from typing import Dict, List, Optional, Union from vllm.block import LogicalTokenBlock @@ -49,6 +50,25 @@ def get_finished_reason(status: "SequenceStatus") -> Union[str, None]: return finish_reason +@dataclass +class RequestMetrics: + """Metrics associated with a request. + + Args: + arrival_time: The time when the request arrived. + first_scheduled_time: The time when the request was first scheduled. + first_token_time: The time when the first token was generated. + time_in_queue: The time the request spent in the queue. + finished_time: The time when the request was finished. + """ + arrival_time: float + last_token_time: float + first_scheduled_time: Optional[float] + first_token_time: Optional[float] + time_in_queue: Optional[float] + finished_time: Optional[float] = None + + class SequenceData: """Data associated with a sequence. @@ -252,11 +272,11 @@ def __init__( self.request_id = request_id self.seqs_dict = {seq.seq_id: seq for seq in seqs} self.sampling_params = sampling_params - self.arrival_time: float = arrival_time - self.last_token_time: float = arrival_time - self.first_scheduled_time: Optional[float] = None - self.first_token_time: Optional[float] = None - self.time_in_queue: Optional[float] = None + self.metrics = RequestMetrics(arrival_time=arrival_time, + last_token_time=arrival_time, + first_scheduled_time=None, + first_token_time=None, + time_in_queue=None) self.lora_request = lora_request self.prefix: Optional[Prefix] = prefix self.prompt_logprobs: Optional[PromptLogprobs] = None @@ -279,20 +299,24 @@ def lora_int_id(self) -> int: def get_last_latency(self, now: float) -> float: """Gets last token latency for Request level timings.""" - latency = now - self.last_token_time - self.last_token_time = now + latency = now - self.metrics.last_token_time + self.metrics.last_token_time = now return latency - def maybe_set_first_token_time(self, now: float) -> None: + def maybe_set_first_token_time(self, time: float) -> None: """Sets the first token time for Request level timings.""" - if self.first_token_time is None: - self.first_token_time = now + if self.metrics.first_token_time is None: + self.metrics.first_token_time = time - def maybe_set_first_scheduled_time(self, now: float) -> None: + def maybe_set_first_scheduled_time(self, time: float) -> None: """Sets the first scheduled time and time in queue for Request level timings.""" - if self.first_scheduled_time is None: - self.first_scheduled_time = now - self.time_in_queue = now - self.arrival_time + if self.metrics.first_scheduled_time is None: + self.metrics.first_scheduled_time = time + self.metrics.time_in_queue = time - self.metrics.arrival_time + + def set_finished_time(self, time: Optional[float]) -> None: + """Sets the finished time for Request level timings.""" + self.metrics.finished_time = time def get_max_num_running_seqs(self) -> int: """The maximum number of sequences running in parallel in the remaining From c1269b51c26b87311a2f0bd36e9611fb85e3ac00 Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Tue, 20 Feb 2024 14:57:06 -0800 Subject: [PATCH 5/5] Fix --- vllm/core/policy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/core/policy.py b/vllm/core/policy.py index 99f183b42c8b4..2e9ebbda54412 100644 --- a/vllm/core/policy.py +++ b/vllm/core/policy.py @@ -33,7 +33,7 @@ def get_priority( now: float, seq_group: SequenceGroup, ) -> float: - return now - seq_group.arrival_time + return now - seq_group.metrics.arrival_time class PolicyFactory: