Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine async postprocessor and multi-step - first WIP version #7743

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/core/test_chunked_prefill_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def append_new_token(seq_group, token_id: int):


def schedule_and_update_computed_tokens(scheduler):
metas, out = scheduler.schedule()
metas, out, _, _ = scheduler.schedule()
for s, meta in zip(out.scheduled_seq_groups, metas):
s.seq_group.update_num_computed_tokens(meta.token_chunk_size)
return metas, out
Expand Down Expand Up @@ -180,7 +180,7 @@ def test_maximal_decoding():
"""Verify decoding requests are prioritized."""
block_size = 4
max_seqs = 2
max_model_len = 2
max_model_len = 8
max_num_batched_tokens = 2
scheduler_config = SchedulerConfig(max_num_batched_tokens,
max_seqs,
Expand Down
2 changes: 1 addition & 1 deletion tests/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def append_new_token(out, token_id: int):


def schedule_and_update_computed_tokens(scheduler):
metas, out = scheduler.schedule()
metas, out, _, _ = scheduler.schedule()
for s, meta in zip(out.scheduled_seq_groups, metas):
s.seq_group.update_num_computed_tokens(meta.token_chunk_size)
return metas, out
Expand Down
2 changes: 2 additions & 0 deletions tests/distributed/test_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME,
str(TP_SIZE),
"--distributed-executor-backend",
DIST_BACKEND,
# disable output proc callback to test PP
"--disable-output-proc-callback",
]

# compare without pipeline parallelism
Expand Down
2 changes: 2 additions & 0 deletions tests/distributed/test_pp_cudagraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def test_pp_cudagraph(PP_SIZE, MODEL_NAME, ATTN_BACKEND):
str(PP_SIZE),
"--distributed-executor-backend",
"mp",
# disable output proc callback to test PP
"--disable-output-proc-callback",
]
os.environ["VLLM_ATTENTION_BACKEND"] = ATTN_BACKEND

Expand Down
4 changes: 4 additions & 0 deletions tests/engine/test_stop_strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ def _test_stopping(llm_engine: LLMEngine,
output: Optional[CompletionOutput] = None
output_text = ""
stop_reason = None

# Run first (because of async callback)
llm_engine.step()

while llm_engine.has_unfinished_requests():
(request_output, ) = llm_engine.step()
(output, ) = request_output.outputs
Expand Down
9 changes: 8 additions & 1 deletion tests/multi_step/test_correctness.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def test_multi_step(example_prompts, model: str, tp_size: int,

server_args = DEFAULT_SERVER_ARGS + ["--enforce-eager"]
ms_server_args = DEFAULT_SERVER_ARGS + \
["--num-scheduler-steps", f"{num_scheduler_steps}"]
["--num-scheduler-steps", f"{num_scheduler_steps}"]#, "--disable-output-proc-callback"]

if eager_mode:
ms_server_args.append("--enforce-eager")
Expand All @@ -82,4 +82,11 @@ def get_text_generations(completions):

ref_generations = get_text_generations(ref_completions)
test_generations = get_text_generations(test_completions)

print("ref_generations:")
for gen in ref_generations:
print("ref_gen: {}".format(gen))
print("test_generations:")
for gen in test_generations:
print("test_gen: {}".format(gen))
assert ref_generations == test_generations
33 changes: 33 additions & 0 deletions vllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def __init__(
skip_tokenizer_init: bool = False,
served_model_name: Optional[Union[str, List[str]]] = None,
limit_mm_per_prompt: Optional[Mapping[str, int]] = None,
use_output_proc_callback: Optional[bool] = True,
) -> None:
self.model = model
self.tokenizer = tokenizer
Expand Down Expand Up @@ -167,6 +168,7 @@ def __init__(
code_revision, rope_scaling, rope_theta)
self.hf_text_config = get_hf_text_config(self.hf_config)
self.dtype = _get_and_verify_dtype(self.hf_text_config, dtype)
self.use_output_proc_callback = use_output_proc_callback

# Choose a default enforce_eager value if the user did not specify
# a value (enforce_eager is None)
Expand Down Expand Up @@ -320,6 +322,30 @@ def _verify_cuda_graph(self) -> None:
self.max_seq_len_to_capture = min(self.max_seq_len_to_capture,
self.max_model_len)

def verify_output_proc_callback(self, speculative_config,
device_config) -> None:
if device_config.device_type != "cuda":
logger.warning(
"Output proc callback can only be enabled with CUDA")
self.use_output_proc_callback = False
return
if self.enforce_eager:
logger.warning(
"To see benefits of output processor callback, enable CUDA "
"graph. Since, enforce-eager is enabled, output processor "
"callback cannot be used")
self.use_output_proc_callback = not self.enforce_eager
return
# Async postprocessor is not necessary with embedding mode
# since there is no token generation
if self.embedding_mode:
self.use_output_proc_callback = False

if speculative_config:
self.use_output_proc_callback = False

# TO DO: assert mp backend

def verify_with_parallel_config(
self,
parallel_config: "ParallelConfig",
Expand Down Expand Up @@ -352,6 +378,11 @@ def verify_with_parallel_config(
"fallback to the eager mode.")
self.enforce_eager = True

if (pipeline_parallel_size > 1) and (self.use_output_proc_callback):
raise NotImplementedError(
"Output processor callback is not supported with "
"pipeline parallelism currently. Disable the callback.")

def get_hf_config_sliding_window(self) -> Optional[int]:
"""Get the sliding window size, or None if disabled."""

Expand Down Expand Up @@ -1754,6 +1785,8 @@ class EngineConfig:
def __post_init__(self):
"""Verify configs are valid & consistent with each other.
"""
self.model_config.verify_output_proc_callback(self.speculative_config,
self.device_config)
self.model_config.verify_with_parallel_config(self.parallel_config)
self.cache_config.verify_with_parallel_config(self.parallel_config)

Expand Down
Loading
Loading