Skip to content

Commit

Permalink
[BACKPORT] Make sure errors can be raised in Actor.__pre_destroy__ (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored Apr 1, 2022
1 parent 68060e2 commit 2152931
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 7 deletions.
3 changes: 2 additions & 1 deletion mars/core/entity/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def start(self):

def _thread_body(self):
from ...deploy.oscar.session import SyncSession
from ...oscar.errors import ActorNotExist

while True:
key, session_ref, fut = self._queue.get()
Expand All @@ -53,7 +54,7 @@ def _thread_body(self):
s = SyncSession.from_isolated_session(session)
s.decref(key)
fut.set_result(None)
except (RuntimeError, ConnectionError, KeyError):
except (RuntimeError, ConnectionError, KeyError, ActorNotExist):
fut.set_result(None)
except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except
fut.set_exception(ex)
Expand Down
6 changes: 6 additions & 0 deletions mars/deploy/oscar/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ async def _start_service(self):
)

async def stop(self):
from .session import SessionAPI

# delete all sessions
session_api = await SessionAPI.create(self._supervisor_pool.external_address)
await session_api.delete_all_sessions()

for worker_pool in self._worker_pools:
await stop_worker(worker_pool.external_address, self._config)
await stop_supervisor(self._supervisor_pool.external_address, self._config)
Expand Down
18 changes: 18 additions & 0 deletions mars/oscar/backends/mars/tests/test_mars_actor_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,21 @@ async def test_promise_chain(actor_pool_context):
call_log = await promise_test_ref.get_call_log()
assert len(call_log) == 2
assert call_log[1][0] - call_log[0][0] < 1


class ActorCannotDestroy(mo.Actor):
async def __pre_destroy__(self):
raise ValueError("Cannot destroy")


@pytest.mark.asyncio
@pytest.mark.parametrize("in_sub_pool", [True, False])
async def test_error_in_pre_destroy(actor_pool_context, in_sub_pool):
pool = actor_pool_context

strategy = None if not in_sub_pool else RandomSubPool()
a = await mo.create_actor(
ActorCannotDestroy, address=pool.external_address, strategy=strategy
)
with pytest.raises(ValueError, match="Cannot destroy"):
await mo.destroy_actor(a)
4 changes: 3 additions & 1 deletion mars/oscar/backends/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,9 @@ async def destroy_actor(self, message: DestroyActorMessage) -> ResultMessageType
return result
real_actor_ref = result.result
if real_actor_ref.address == self.external_address:
await super().destroy_actor(message)
result = await super().destroy_actor(message)
if result.message_type == MessageType.error:
return result
del self._allocated_actors[self.external_address][real_actor_ref]
return ResultMessage(
message.message_id, real_actor_ref.uid, protocol=message.protocol
Expand Down
6 changes: 6 additions & 0 deletions mars/services/session/api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ async def delete_session(self, session_id: str):
Session ID.
"""

@abstractmethod
async def delete_all_sessions(self):
"""
Delete all sessions.
"""

@abstractmethod
async def get_last_idle_time(
self, session_id: Union[str, None] = None
Expand Down
3 changes: 3 additions & 0 deletions mars/services/session/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ async def has_session(self, session_id: str) -> bool:
async def delete_session(self, session_id: str):
await self._session_manager_ref.delete_session(session_id)

async def delete_all_sessions(self):
await self._session_manager_ref.delete_all_sessions()

@alru_cache(cache_exceptions=False)
async def get_session_address(self, session_id: str) -> str:
"""
Expand Down
9 changes: 9 additions & 0 deletions mars/services/session/api/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ async def delete_session(self, session_id: str):
oscar_api = await self._get_oscar_session_api()
await oscar_api.delete_session(session_id)

@web_api("", method="delete")
async def delete_all_sessions(self):
oscar_api = await self._get_oscar_session_api()
await oscar_api.delete_all_sessions()

@web_api(
"(?P<session_id>[^/]+)", method="get", arg_filter={"action": "check_exist"}
)
Expand Down Expand Up @@ -135,6 +140,10 @@ async def delete_session(self, session_id: str):
addr = f"{self._address}/api/session/{session_id}"
await self._request_url(path=addr, method="DELETE")

async def delete_all_sessions(self):
addr = f"{self._address}/api/session"
await self._request_url(path=addr, method="DELETE")

async def has_session(self, session_id: str):
addr = f"{self._address}/api/session/{session_id}"
params = dict(action="check_exist")
Expand Down
9 changes: 8 additions & 1 deletion mars/services/session/supervisor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def has_session(self, session_id: str):

async def delete_session(self, session_id):
session_actor_ref = self._session_refs.pop(session_id)
await session_actor_ref.remove()
await mo.destroy_actor(session_actor_ref)

# sync removing to other managers
Expand All @@ -113,6 +114,10 @@ async def delete_session(self, session_id):
)
await session_manager_ref.remove_session_ref(session_id)

async def delete_all_sessions(self):
for session_id in list(self._session_refs):
await self.delete_session(session_id)

async def get_last_idle_time(self, session_id=None):
if session_id is not None:
session = self._session_refs[session_id]
Expand Down Expand Up @@ -160,10 +165,12 @@ async def __post_create__(self):
uid=CustomLogMetaActor.gen_uid(self._session_id),
)

async def __pre_destroy__(self):
async def remove(self):
await destroy_service_session(
NodeRole.SUPERVISOR, self._service_config, self._session_id, self.address
)

async def __pre_destroy__(self):
await mo.destroy_actor(self._custom_log_meta_ref)

async def create_services(self):
Expand Down
2 changes: 2 additions & 0 deletions mars/services/session/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ async def test_session_service(test_web):
await session_api.delete_session(session_id)
assert await session_api.has_session(session_id) is False
assert await session_api.get_sessions() == []
await session_api.delete_all_sessions()
assert await session_api.has_session(session_id) is False

await stop_services(NodeRole.SUPERVISOR, config, address=pool.external_address)

Expand Down
13 changes: 10 additions & 3 deletions mars/services/subtask/worker/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,16 @@ async def __post_create__(self):
self._cluster_api = await ClusterAPI.create(address=self.address)

async def __pre_destroy__(self):
await asyncio.gather(
*[mo.destroy_actor(ref) for ref in self._session_id_to_processors.values()]
)
try:
await asyncio.gather(
*[
mo.destroy_actor(ref)
for ref in self._session_id_to_processors.values()
]
)
except mo.ActorNotExist: # pragma: no cover
# deleted, ignore
pass

@classmethod
def _get_subtask_processor_cls(cls, subtask_processor_cls):
Expand Down
2 changes: 1 addition & 1 deletion mars/storage/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async def setup(cls, **kwargs) -> Tuple[Dict, Dict]:
@staticmethod
@implements(StorageBackend.teardown)
async def teardown(**kwargs):
object_ids = kwargs.get("object_ids")
object_ids = kwargs.get("object_ids") or ()
for object_id in object_ids:
try:
shm = SharedMemory(name=object_id)
Expand Down

0 comments on commit 2152931

Please sign in to comment.