Skip to content

Commit

Permalink
[Core][Compiled Graph] Fix shutdown error (ray-project#48280)
Browse files Browse the repository at this point in the history
Fixes remaining shutdown issues. It also fixes some failures in the nightly we accidently merged

Signed-off-by: mohitjain2504 <mohit.jain@dream11.com>
  • Loading branch information
rkooo567 authored and mohitjain2504 committed Nov 15, 2024
1 parent ba065c3 commit 18a5b81
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 114 deletions.
40 changes: 26 additions & 14 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
# Keep tracking of every compiled dag created during the lifetime of
# this process. It tracks them as weakref meaning when the compiled dag
# is GC'ed, it is automatically removed from here. It is used to teardown
# compiled dags at interpret shutdown time.
# compiled dags at interpreter shutdown time.
_compiled_dags = weakref.WeakValueDictionary()


Expand All @@ -66,10 +66,12 @@
# upon `ray.worker.shutdown` which is registered to atexit handler
# so that teardown is properly called before objects are destructed.
def _shutdown_all_compiled_dags():
global _compiled_dags
for _, compiled_dag in _compiled_dags.items():
# Kill DAG actors to avoid hanging during shutdown if the actor tasks
# cannot be cancelled.
compiled_dag.teardown(kill_actors=True)
_compiled_dags = weakref.WeakValueDictionary()


@DeveloperAPI
Expand Down Expand Up @@ -733,11 +735,17 @@ def _create_proxy_actor() -> "ray.actor.ActorHandle":
).remote()

self._proxy_actor = _create_proxy_actor()
# Set to True when `teardown` API is called.
self._is_teardown = False

@property
def nccl_group_id_p2p(self) -> Optional[str]:
return self._nccl_group_id_p2p

@property
def is_teardown(self) -> bool:
return self._is_teardown

@property
def nccl_group_ids(self) -> Set[str]:
return self._nccl_group_ids
Expand Down Expand Up @@ -1716,7 +1724,7 @@ def wait_teardown(self, kill_actors: bool = False):
except Exception:
pass

def teardown(self, wait: bool, kill_actors: bool = False):
def teardown(self, kill_actors: bool = False):
do_teardown = False
with self.in_teardown_lock:
if self._teardown_done:
Expand All @@ -1728,9 +1736,12 @@ def teardown(self, wait: bool, kill_actors: bool = False):

if not do_teardown:
# Teardown is already being performed.
if wait:
self.wait_teardown(kill_actors)
return
while True:
with self.in_teardown_lock:
if self._teardown_done:
return

time.sleep(0.1)

logger.info("Tearing down compiled DAG")
outer._dag_submitter.close()
Expand All @@ -1757,10 +1768,9 @@ def teardown(self, wait: bool, kill_actors: bool = False):
for nccl_group_id in outer._nccl_group_ids:
_destroy_nccl_group(nccl_group_id)

if wait:
logger.info("Waiting for worker tasks to exit")
self.wait_teardown()
logger.info("Teardown complete")
logger.info("Waiting for worker tasks to exit")
self.wait_teardown()
logger.info("Teardown complete")

with self.in_teardown_lock:
self._teardown_done = True
Expand All @@ -1770,7 +1780,7 @@ def run(self):
ray.get(list(outer.worker_task_refs.values()))
except Exception as e:
logger.debug(f"Handling exception from worker tasks: {e}")
self.teardown(wait=True)
self.teardown()

monitor = Monitor()
monitor.start()
Expand Down Expand Up @@ -2174,14 +2184,16 @@ def teardown(self, kill_actors: bool = False):
"""Teardown and cancel all actor tasks for this DAG. After this
function returns, the actors should be available to execute new tasks
or compile a new DAG."""
if self._is_teardown:
return

monitor = getattr(self, "_monitor", None)
if monitor is not None:
monitor.teardown(wait=True, kill_actors=kill_actors)
monitor.teardown(kill_actors=kill_actors)
self._is_teardown = True

def __del__(self):
monitor = getattr(self, "_monitor", None)
if monitor is not None:
monitor.teardown(wait=True)
self.teardown()


@DeveloperAPI
Expand Down
Loading

0 comments on commit 18a5b81

Please sign in to comment.