diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index a345f8caeeed2..2e27224b41864 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -1,4 +1,5 @@ import asyncio +import atexit import importlib import inspect import multiprocessing @@ -196,6 +197,14 @@ async def build_async_engine_client_from_engine_args( assert engine_pid is not None, "Engine process failed to start." logger.info("Started engine process with PID %d", engine_pid) + def _cleanup_ipc_path(): + socket_path = ipc_path.replace("ipc://", "") + if os.path.exists(socket_path): + os.remove(socket_path) + + # Ensure we clean up the local IPC socket file on exit. + atexit.register(_cleanup_ipc_path) + # Build RPCClient, which conforms to EngineClient Protocol. engine_config = engine_args.create_engine_config() build_client = partial(MQLLMEngineClient, ipc_path, engine_config, diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 877a35e36427a..6246a0067842a 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -4,6 +4,7 @@ import signal import threading import time +from dataclasses import dataclass from multiprocessing.process import BaseProcess from typing import List, Tuple, Type, Union @@ -129,6 +130,14 @@ def profile(self, is_start=True): self.model_executor.profile(is_start) +@dataclass +class EngineCoreProcHandle: + proc: BaseProcess + ready_path: str + input_path: str + output_path: str + + class EngineCoreProc(EngineCore): """ZMQ-wrapper for running EngineCore in background process.""" @@ -200,7 +209,7 @@ def make_engine_core_process( input_path: str, output_path: str, ready_path: str, - ) -> BaseProcess: + ) -> EngineCoreProcHandle: # The current process might have CUDA context, # so we need to spawn a new process. # NOTE(rob): this is a problem for using EngineCoreProc w/ @@ -222,7 +231,10 @@ def make_engine_core_process( # Wait for startup EngineCoreProc.wait_for_startup(proc, ready_path) - return proc + return EngineCoreProcHandle(proc=proc, + ready_path=ready_path, + input_path=input_path, + output_path=output_path) @staticmethod def run_engine_core(*args, **kwargs): diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 1d5ddf4db4d7c..8eb9a27438d53 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,4 +1,5 @@ import atexit +import os from typing import List, Union import msgspec @@ -148,7 +149,7 @@ def __init__( self.input_socket.bind(input_path) # Start EngineCore in background process. - self.proc = EngineCoreProc.make_engine_core_process( + self.proc_handle = EngineCoreProc.make_engine_core_process( *args, input_path=input_path, output_path=output_path, @@ -161,13 +162,24 @@ def shutdown(self): # Shut down the zmq context. self.ctx.destroy(linger=0) - # Shutdown the process if needed. - if hasattr(self, "proc") and self.proc.is_alive(): - self.proc.terminate() - self.proc.join(5) - - if self.proc.is_alive(): - kill_process_tree(self.proc.pid) + if hasattr(self, "proc_handle"): + # Shutdown the process if needed. + if self.proc_handle.proc.is_alive(): + self.proc_handle.proc.terminate() + self.proc_handle.proc.join(5) + + if self.proc_handle.proc.is_alive(): + kill_process_tree(self.proc_handle.proc.pid) + + # Remove zmq ipc socket files + ipc_sockets = [ + self.proc_handle.ready_path, self.proc_handle.output_path, + self.proc_handle.input_path + ] + for ipc_socket in ipc_sockets: + socket_file = ipc_socket.replace("ipc://", "") + if os.path.exists(socket_file): + os.remove(socket_file) def __del__(self): self.shutdown() diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index f8f3d583618cf..63a12f791051f 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -172,16 +172,23 @@ def wait_for_termination(procs, timeout): # Send SIGTERM if still running active_procs = [w.proc for w in self.workers if w.proc.is_alive()] - self.workers = None for p in active_procs: p.terminate() - if wait_for_termination(active_procs, 4): - return + if not wait_for_termination(active_procs, 4): + # Send SIGKILL if still running + active_procs = [p for p in active_procs if p.is_alive()] + for p in active_procs: + p.kill() - # Send SIGKILL if still running - active_procs = [p for p in active_procs if p.is_alive()] - for p in active_procs: - p.kill() + self._cleanup_sockets() + self.workers = None + + def _cleanup_sockets(self): + for w in self.workers: + # Remove the zmq ipc socket file + socket_path = w.ready_path.replace("ipc://", "") + if os.path.exists(socket_path): + os.remove(socket_path) def shutdown(self): """Properly shut down the executor and its workers"""