Skip to content

Commit

Permalink
Raise ActorNotExist when no supervisors available (#2859)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored Mar 25, 2022
1 parent fabe7fa commit 4ad5491
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ test*.ipynb
*.log
*.swp

# nfs temp file
.nfs*

# docs
*.mo
docs/**/generated
Expand Down
24 changes: 23 additions & 1 deletion mars/deploy/kubernetes/tests/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,34 @@ def _build_docker_images(use_test_docker_file=True):


def _remove_docker_image(image_name, raises=True):
if "CI" not in os.environ:
# delete image iff in CI environment
return
proc = subprocess.Popen(["docker", "rmi", "-f", image_name])
if proc.wait() != 0 and raises:
raise SystemError("Executing docker rmi failed.")


def _load_docker_env():
if os.path.exists("/var/run/docker.sock") or not shutil.which("minikube"):
return

proc = subprocess.Popen(["minikube", "docker-env"], stdout=subprocess.PIPE)
proc.wait(30)
for line in proc.stdout:
line = line.decode().split("#", 1)[0]
line = line.strip() # type: str | bytes
export_pos = line.find("export")
if export_pos < 0:
continue
line = line[export_pos + 6 :].strip()
var, value = line.split("=", 1)
os.environ[var] = value.strip('"')


@contextmanager
def _start_kube_cluster(use_test_docker_file=True, **kwargs):
_load_docker_env()
image_name = _build_docker_images(use_test_docker_file=use_test_docker_file)

temp_spill_dir = tempfile.mkdtemp(prefix="test-mars-k8s-")
Expand Down Expand Up @@ -208,7 +229,7 @@ def _start_kube_cluster(use_test_docker_file=True, **kwargs):
_remove_docker_image(image_name, False)


@pytest.mark.parametrize("use_test_docker_file", [False, True])
@pytest.mark.parametrize("use_test_docker_file", [True, False])
@pytest.mark.skipif(not kube_available, reason="Cannot run without kubernetes")
def test_run_in_kubernetes(use_test_docker_file):
with _start_kube_cluster(
Expand Down Expand Up @@ -240,6 +261,7 @@ def test_run_in_kubernetes(use_test_docker_file):
new=lambda *_, **__: None,
)
def test_create_timeout():
_load_docker_env()
api_client = k8s_config.new_client_from_config()

cluster = None
Expand Down
6 changes: 5 additions & 1 deletion mars/services/cluster/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ async def get_supervisor_refs(self, uids: List[str]) -> List[mo.ActorRef]:
references of the actors
"""
addrs = await self.get_supervisors_by_keys(uids)
if any(addr is None for addr in addrs):
none_uid = next(uid for addr, uid in zip(addrs, uids) if addr is None)
raise mo.ActorNotExist(f"Actor {none_uid} not exist as no supervisors")

return await asyncio.gather(
*[mo.actor_ref(uid, address=addr) for addr, uid in zip(addrs, uids)]
)
Expand Down Expand Up @@ -155,7 +159,7 @@ async def get_nodes_info(
detail: bool = False,
statuses: Set[NodeStatus] = None,
exclude_statuses: Set[NodeStatus] = None,
):
) -> Dict[str, Dict]:
statuses = self._calc_statuses(statuses, exclude_statuses)
node_info_ref = await self._get_node_info_ref()
return await node_info_ref.get_nodes_info(
Expand Down
8 changes: 5 additions & 3 deletions mars/services/cluster/backends/fixed.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import AsyncGenerator, List, Optional
from typing import AsyncGenerator, List, Optional, Union

from ..core import NodeRole
from .base import AbstractClusterBackend, register_cluster_backend
Expand All @@ -22,8 +22,10 @@
class FixedClusterBackend(AbstractClusterBackend):
name = "fixed"

def __init__(self, lookup_address: str):
self._supervisors = [n.strip() for n in lookup_address.split(",")]
def __init__(self, lookup_address: Union[List[str], str]):
if isinstance(lookup_address, str):
lookup_address = lookup_address.split(",")
self._supervisors = [n.strip() for n in lookup_address]

@classmethod
async def create(
Expand Down
2 changes: 1 addition & 1 deletion mars/services/cluster/locator.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def get_supervisors(self, filter_ready: bool = True):

@mo.extensible
def get_supervisor(self, key: str, size=1):
if self._supervisors is None: # pragma: no cover
if not self._supervisors:
return None
elif size == 1:
return self._hash_ring.get_node(key)
Expand Down
30 changes: 29 additions & 1 deletion mars/services/cluster/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from ....utils import get_next_port
from ... import NodeRole
from ...web.supervisor import WebSupervisorService
from ..api import MockClusterAPI, WebClusterAPI
from ..api import ClusterAPI, MockClusterAPI, WebClusterAPI
from ..api.web import web_handlers
from ..core import NodeStatus

Expand Down Expand Up @@ -137,3 +137,31 @@ async def test_web_api(actor_pool):
await asyncio.wait_for(wait_async_gen(web_api.watch_all_bands()), timeout=0.1)

await MockClusterAPI.cleanup(pool_addr)


@pytest.mark.asyncio
async def test_no_supervisor(actor_pool):
pool_addr = actor_pool.external_address

from ..supervisor.locator import SupervisorPeerLocatorActor
from ..uploader import NodeInfoUploaderActor

await mo.create_actor(
SupervisorPeerLocatorActor,
"fixed",
[],
uid=SupervisorPeerLocatorActor.default_uid(),
address=pool_addr,
)
await mo.create_actor(
NodeInfoUploaderActor,
NodeRole.WORKER,
interval=1,
band_to_slots=None,
use_gpu=False,
uid=NodeInfoUploaderActor.default_uid(),
address=pool_addr,
)
api = await ClusterAPI.create(address=pool_addr)
with pytest.raises(mo.ActorNotExist):
await api.get_supervisor_refs(["KEY"])
1 change: 1 addition & 0 deletions mars/services/scheduling/worker/tests/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ async def __post_create__(self):
async def __pre_destroy__(self):
pass

@mo.extensible
async def update_subtask_resources(
self, band, session_id: str, subtask_id: str, resources: Resource
):
Expand Down
28 changes: 19 additions & 9 deletions mars/services/scheduling/worker/workerslot.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ async def __post_create__(self):
)
self._fresh_slots.add(slot_id)

self._usage_upload_task = self.ref().upload_slot_usages.tell_delay(
periodical=True, delay=1
)
self._upload_slot_usage_with_delay()

async def __pre_destroy__(self):
self._usage_upload_task.cancel()
Expand All @@ -131,9 +129,12 @@ async def _get_global_resource_ref(self):

from ..supervisor import GlobalResourceManagerActor

[self._global_resource_ref] = await self._cluster_api.get_supervisor_refs(
[GlobalResourceManagerActor.default_uid()]
)
try:
[self._global_resource_ref] = await self._cluster_api.get_supervisor_refs(
[GlobalResourceManagerActor.default_uid()]
)
except mo.ActorNotExist:
self._global_resource_ref = None
return self._global_resource_ref

def get_slot_address(self, slot_id: int):
Expand Down Expand Up @@ -241,10 +242,21 @@ async def restart_free_slots(self):
self._restarting = False
self._restart_done_event.set()

def _upload_slot_usage_with_delay(self, delay: int = 1):
self._usage_upload_task = self.ref().upload_slot_usages.tell_delay(
periodical=True, delay=delay
)

async def upload_slot_usages(self, periodical: bool = False):
delays = []
slot_infos = []
global_resource_ref = await self._get_global_resource_ref()

if global_resource_ref is None: # pragma: no cover
if periodical:
self._upload_slot_usage_with_delay()
return

for slot_id, proc in self._slot_to_proc.items():
if slot_id not in self._slot_to_session_stid:
continue
Expand Down Expand Up @@ -291,9 +303,7 @@ async def upload_slot_usages(self, periodical: bool = False):
await self._cluster_api.set_band_slot_infos(self._band_name, slot_infos)

if periodical:
self._usage_upload_task = self.ref().upload_slot_usages.tell_delay(
periodical=True, delay=1
)
self._upload_slot_usage_with_delay()

def dump_data(self):
"""
Expand Down
8 changes: 7 additions & 1 deletion mars/services/session/supervisor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
import functools
import time
from typing import Dict, List, Optional

from .... import oscar as mo
Expand All @@ -28,9 +29,11 @@ def __init__(self, service_config: Optional[Dict] = None):
self._session_refs: Dict[str, mo.ActorRef] = dict()
self._cluster_api: Optional[ClusterAPI] = None
self._service_config = service_config or dict()
self._stored_last_idle_time = None

async def __post_create__(self):
self._cluster_api = await ClusterAPI.create(self.address)
self._stored_last_idle_time = time.time()

async def __pre_destroy__(self):
await asyncio.gather(
Expand Down Expand Up @@ -124,7 +127,10 @@ async def get_last_idle_time(self, session_id=None):
if any(last_idle_time is None for last_idle_time in all_last_idle_time):
raise mo.Return(None)
else:
raise mo.Return(max(all_last_idle_time))
self._stored_last_idle_time = max(
[self._stored_last_idle_time] + all_last_idle_time
)
raise mo.Return(self._stored_last_idle_time)


class SessionActor(mo.Actor):
Expand Down
4 changes: 4 additions & 0 deletions mars/services/session/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import threading
import time

import pytest
import numpy as np
Expand Down Expand Up @@ -99,7 +100,10 @@ async def test_get_last_idle_time():
NodeRole.WORKER, config, address=worker_pool.external_address
)

start_time = time.time()
session_api = await SessionAPI.create(sv_pool.external_address)
assert await session_api.get_last_idle_time() < start_time

session_id = "test_session"
await session_api.create_session(session_id)
# check last idle time is not None
Expand Down

0 comments on commit 4ad5491

Please sign in to comment.