Skip to content

Commit

Permalink
Parallelize output socket IO on client side
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Hill <nhill@redhat.com>
  • Loading branch information
njhill committed Jan 22, 2025
1 parent 55dd119 commit 0e92b61
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
5 changes: 3 additions & 2 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,10 @@ async def _run_output_handler(self):
# event loop for too long.
num_outputs = len(outputs.outputs)
if num_outputs <= OUTPUT_PROCESSING_CHUNK_SIZE:
slices = (outputs.outputs,)
slices = (outputs.outputs, )
else:
slices = np.array_split(outputs.outputs,
slices = np.array_split(
outputs.outputs,
math.ceil(num_outputs / OUTPUT_PROCESSING_CHUNK_SIZE))

iteration_stats = None
Expand Down
20 changes: 17 additions & 3 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import os
import signal
import weakref
from abc import ABC, abstractmethod
from typing import List, Type
from typing import List, Optional, Type

import msgspec
import zmq
Expand Down Expand Up @@ -242,10 +243,23 @@ def __init__(self, vllm_config: VllmConfig,
log_stats=True,
)

self.outputs_queue: Optional[asyncio.Queue[bytes]] = None
self.queue_task: Optional[asyncio.Task] = None

async def get_output_async(self) -> EngineCoreOutputs:
if self.outputs_queue is None:
# Perform IO in separate task to parallelize as much as possible
self.outputs_queue: asyncio.Queue[bytes] = asyncio.Queue()

async def process_outputs_socket():
while True:
(frame, ) = await self.output_socket.recv_multipart(
copy=False)
self.outputs_queue.put_nowait(frame.buffer)

self.queue_task = asyncio.create_task(process_outputs_socket())

frames = await self.output_socket.recv_multipart(copy=False)
return self.decoder.decode(frames[0].buffer)
return self.decoder.decode(await self.outputs_queue.get())

async def _send_input(self, request_type: EngineCoreRequestType,
request: EngineCoreRequestUnion) -> None:
Expand Down

0 comments on commit 0e92b61

Please sign in to comment.