Skip to content

Commit

Permalink
[Core] cleanup zmq ipc sockets on exit (vllm-project#11115)
Browse files Browse the repository at this point in the history
Signed-off-by: Russell Bryant <rbryant@redhat.com>
  • Loading branch information
russellb authored and weilong.yu committed Dec 13, 2024
1 parent ff19114 commit e1de8c5
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 17 deletions.
9 changes: 9 additions & 0 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import atexit
import importlib
import inspect
import multiprocessing
Expand Down Expand Up @@ -196,6 +197,14 @@ async def build_async_engine_client_from_engine_args(
assert engine_pid is not None, "Engine process failed to start."
logger.info("Started engine process with PID %d", engine_pid)

def _cleanup_ipc_path():
socket_path = ipc_path.replace("ipc://", "")
if os.path.exists(socket_path):
os.remove(socket_path)

# Ensure we clean up the local IPC socket file on exit.
atexit.register(_cleanup_ipc_path)

# Build RPCClient, which conforms to EngineClient Protocol.
engine_config = engine_args.create_engine_config()
build_client = partial(MQLLMEngineClient, ipc_path, engine_config,
Expand Down
16 changes: 14 additions & 2 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import signal
import threading
import time
from dataclasses import dataclass
from multiprocessing.process import BaseProcess
from typing import List, Tuple, Type, Union

Expand Down Expand Up @@ -129,6 +130,14 @@ def profile(self, is_start=True):
self.model_executor.profile(is_start)


@dataclass
class EngineCoreProcHandle:
proc: BaseProcess
ready_path: str
input_path: str
output_path: str


class EngineCoreProc(EngineCore):
"""ZMQ-wrapper for running EngineCore in background process."""

Expand Down Expand Up @@ -200,7 +209,7 @@ def make_engine_core_process(
input_path: str,
output_path: str,
ready_path: str,
) -> BaseProcess:
) -> EngineCoreProcHandle:
# The current process might have CUDA context,
# so we need to spawn a new process.
# NOTE(rob): this is a problem for using EngineCoreProc w/
Expand All @@ -222,7 +231,10 @@ def make_engine_core_process(

# Wait for startup
EngineCoreProc.wait_for_startup(proc, ready_path)
return proc
return EngineCoreProcHandle(proc=proc,
ready_path=ready_path,
input_path=input_path,
output_path=output_path)

@staticmethod
def run_engine_core(*args, **kwargs):
Expand Down
28 changes: 20 additions & 8 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import atexit
import os
from typing import List, Union

import msgspec
Expand Down Expand Up @@ -148,7 +149,7 @@ def __init__(
self.input_socket.bind(input_path)

# Start EngineCore in background process.
self.proc = EngineCoreProc.make_engine_core_process(
self.proc_handle = EngineCoreProc.make_engine_core_process(
*args,
input_path=input_path,
output_path=output_path,
Expand All @@ -161,13 +162,24 @@ def shutdown(self):
# Shut down the zmq context.
self.ctx.destroy(linger=0)

# Shutdown the process if needed.
if hasattr(self, "proc") and self.proc.is_alive():
self.proc.terminate()
self.proc.join(5)

if self.proc.is_alive():
kill_process_tree(self.proc.pid)
if hasattr(self, "proc_handle"):
# Shutdown the process if needed.
if self.proc_handle.proc.is_alive():
self.proc_handle.proc.terminate()
self.proc_handle.proc.join(5)

if self.proc_handle.proc.is_alive():
kill_process_tree(self.proc_handle.proc.pid)

# Remove zmq ipc socket files
ipc_sockets = [
self.proc_handle.ready_path, self.proc_handle.output_path,
self.proc_handle.input_path
]
for ipc_socket in ipc_sockets:
socket_file = ipc_socket.replace("ipc://", "")
if os.path.exists(socket_file):
os.remove(socket_file)

def __del__(self):
self.shutdown()
Expand Down
21 changes: 14 additions & 7 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,23 @@ def wait_for_termination(procs, timeout):

# Send SIGTERM if still running
active_procs = [w.proc for w in self.workers if w.proc.is_alive()]
self.workers = None
for p in active_procs:
p.terminate()
if wait_for_termination(active_procs, 4):
return
if not wait_for_termination(active_procs, 4):
# Send SIGKILL if still running
active_procs = [p for p in active_procs if p.is_alive()]
for p in active_procs:
p.kill()

# Send SIGKILL if still running
active_procs = [p for p in active_procs if p.is_alive()]
for p in active_procs:
p.kill()
self._cleanup_sockets()
self.workers = None

def _cleanup_sockets(self):
for w in self.workers:
# Remove the zmq ipc socket file
socket_path = w.ready_path.replace("ipc://", "")
if os.path.exists(socket_path):
os.remove(socket_path)

def shutdown(self):
"""Properly shut down the executor and its workers"""
Expand Down

0 comments on commit e1de8c5

Please sign in to comment.