Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise ActorNotExist when no supervisors available #2859

Merged
merged 2 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ test*.ipynb
*.log
*.swp

# nfs temp file
.nfs*

# docs
*.mo
docs/**/generated
Expand Down
24 changes: 23 additions & 1 deletion mars/deploy/kubernetes/tests/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,34 @@ def _build_docker_images(use_test_docker_file=True):


def _remove_docker_image(image_name, raises=True):
if "CI" not in os.environ:
# delete image iff in CI environment
return
proc = subprocess.Popen(["docker", "rmi", "-f", image_name])
if proc.wait() != 0 and raises:
raise SystemError("Executing docker rmi failed.")


def _load_docker_env():
if os.path.exists("/var/run/docker.sock") or not shutil.which("minikube"):
return

proc = subprocess.Popen(["minikube", "docker-env"], stdout=subprocess.PIPE)
proc.wait(30)
for line in proc.stdout:
line = line.decode().split("#", 1)[0]
line = line.strip() # type: str | bytes
export_pos = line.find("export")
if export_pos < 0:
continue
line = line[export_pos + 6 :].strip()
var, value = line.split("=", 1)
os.environ[var] = value.strip('"')


@contextmanager
def _start_kube_cluster(use_test_docker_file=True, **kwargs):
_load_docker_env()
image_name = _build_docker_images(use_test_docker_file=use_test_docker_file)

temp_spill_dir = tempfile.mkdtemp(prefix="test-mars-k8s-")
Expand Down Expand Up @@ -208,7 +229,7 @@ def _start_kube_cluster(use_test_docker_file=True, **kwargs):
_remove_docker_image(image_name, False)


@pytest.mark.parametrize("use_test_docker_file", [False, True])
@pytest.mark.parametrize("use_test_docker_file", [True, False])
@pytest.mark.skipif(not kube_available, reason="Cannot run without kubernetes")
def test_run_in_kubernetes(use_test_docker_file):
with _start_kube_cluster(
Expand Down Expand Up @@ -240,6 +261,7 @@ def test_run_in_kubernetes(use_test_docker_file):
new=lambda *_, **__: None,
)
def test_create_timeout():
_load_docker_env()
api_client = k8s_config.new_client_from_config()

cluster = None
Expand Down
6 changes: 5 additions & 1 deletion mars/services/cluster/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ async def get_supervisor_refs(self, uids: List[str]) -> List[mo.ActorRef]:
references of the actors
"""
addrs = await self.get_supervisors_by_keys(uids)
if any(addr is None for addr in addrs):
none_uid = next(uid for addr, uid in zip(addrs, uids) if addr is None)
raise mo.ActorNotExist(f"Actor {none_uid} not exist as no supervisors")

return await asyncio.gather(
*[mo.actor_ref(uid, address=addr) for addr, uid in zip(addrs, uids)]
)
Expand Down Expand Up @@ -155,7 +159,7 @@ async def get_nodes_info(
detail: bool = False,
statuses: Set[NodeStatus] = None,
exclude_statuses: Set[NodeStatus] = None,
):
) -> Dict[str, Dict]:
statuses = self._calc_statuses(statuses, exclude_statuses)
node_info_ref = await self._get_node_info_ref()
return await node_info_ref.get_nodes_info(
Expand Down
8 changes: 5 additions & 3 deletions mars/services/cluster/backends/fixed.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import AsyncGenerator, List, Optional
from typing import AsyncGenerator, List, Optional, Union

from ..core import NodeRole
from .base import AbstractClusterBackend, register_cluster_backend
Expand All @@ -22,8 +22,10 @@
class FixedClusterBackend(AbstractClusterBackend):
name = "fixed"

def __init__(self, lookup_address: str):
self._supervisors = [n.strip() for n in lookup_address.split(",")]
def __init__(self, lookup_address: Union[List[str], str]):
if isinstance(lookup_address, str):
lookup_address = lookup_address.split(",")
self._supervisors = [n.strip() for n in lookup_address]

@classmethod
async def create(
Expand Down
2 changes: 1 addition & 1 deletion mars/services/cluster/locator.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def get_supervisors(self, filter_ready: bool = True):

@mo.extensible
def get_supervisor(self, key: str, size=1):
if self._supervisors is None: # pragma: no cover
if not self._supervisors:
return None
elif size == 1:
return self._hash_ring.get_node(key)
Expand Down
30 changes: 29 additions & 1 deletion mars/services/cluster/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from ....utils import get_next_port
from ... import NodeRole
from ...web.supervisor import WebSupervisorService
from ..api import MockClusterAPI, WebClusterAPI
from ..api import ClusterAPI, MockClusterAPI, WebClusterAPI
from ..api.web import web_handlers
from ..core import NodeStatus

Expand Down Expand Up @@ -137,3 +137,31 @@ async def test_web_api(actor_pool):
await asyncio.wait_for(wait_async_gen(web_api.watch_all_bands()), timeout=0.1)

await MockClusterAPI.cleanup(pool_addr)


@pytest.mark.asyncio
async def test_no_supervisor(actor_pool):
pool_addr = actor_pool.external_address

from ..supervisor.locator import SupervisorPeerLocatorActor
from ..uploader import NodeInfoUploaderActor

await mo.create_actor(
SupervisorPeerLocatorActor,
"fixed",
[],
uid=SupervisorPeerLocatorActor.default_uid(),
address=pool_addr,
)
await mo.create_actor(
NodeInfoUploaderActor,
NodeRole.WORKER,
interval=1,
band_to_slots=None,
use_gpu=False,
uid=NodeInfoUploaderActor.default_uid(),
address=pool_addr,
)
api = await ClusterAPI.create(address=pool_addr)
with pytest.raises(mo.ActorNotExist):
await api.get_supervisor_refs(["KEY"])
1 change: 1 addition & 0 deletions mars/services/scheduling/worker/tests/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ async def __post_create__(self):
async def __pre_destroy__(self):
pass

@mo.extensible
async def update_subtask_resources(
self, band, session_id: str, subtask_id: str, resources: Resource
):
Expand Down
28 changes: 19 additions & 9 deletions mars/services/scheduling/worker/workerslot.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ async def __post_create__(self):
)
self._fresh_slots.add(slot_id)

self._usage_upload_task = self.ref().upload_slot_usages.tell_delay(
periodical=True, delay=1
)
self._upload_slot_usage_with_delay()

async def __pre_destroy__(self):
self._usage_upload_task.cancel()
Expand All @@ -131,9 +129,12 @@ async def _get_global_resource_ref(self):

from ..supervisor import GlobalResourceManagerActor

[self._global_resource_ref] = await self._cluster_api.get_supervisor_refs(
[GlobalResourceManagerActor.default_uid()]
)
try:
[self._global_resource_ref] = await self._cluster_api.get_supervisor_refs(
[GlobalResourceManagerActor.default_uid()]
)
except mo.ActorNotExist:
self._global_resource_ref = None
return self._global_resource_ref

def get_slot_address(self, slot_id: int):
Expand Down Expand Up @@ -241,10 +242,21 @@ async def restart_free_slots(self):
self._restarting = False
self._restart_done_event.set()

def _upload_slot_usage_with_delay(self, delay: int = 1):
self._usage_upload_task = self.ref().upload_slot_usages.tell_delay(
periodical=True, delay=delay
)

async def upload_slot_usages(self, periodical: bool = False):
delays = []
slot_infos = []
global_resource_ref = await self._get_global_resource_ref()

if global_resource_ref is None: # pragma: no cover
if periodical:
self._upload_slot_usage_with_delay()
return

for slot_id, proc in self._slot_to_proc.items():
if slot_id not in self._slot_to_session_stid:
continue
Expand Down Expand Up @@ -291,9 +303,7 @@ async def upload_slot_usages(self, periodical: bool = False):
await self._cluster_api.set_band_slot_infos(self._band_name, slot_infos)

if periodical:
self._usage_upload_task = self.ref().upload_slot_usages.tell_delay(
periodical=True, delay=1
)
self._upload_slot_usage_with_delay()

def dump_data(self):
"""
Expand Down
8 changes: 7 additions & 1 deletion mars/services/session/supervisor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
import functools
import time
from typing import Dict, List, Optional

from .... import oscar as mo
Expand All @@ -28,9 +29,11 @@ def __init__(self, service_config: Optional[Dict] = None):
self._session_refs: Dict[str, mo.ActorRef] = dict()
self._cluster_api: Optional[ClusterAPI] = None
self._service_config = service_config or dict()
self._stored_last_idle_time = None

async def __post_create__(self):
self._cluster_api = await ClusterAPI.create(self.address)
self._stored_last_idle_time = time.time()

async def __pre_destroy__(self):
await asyncio.gather(
Expand Down Expand Up @@ -124,7 +127,10 @@ async def get_last_idle_time(self, session_id=None):
if any(last_idle_time is None for last_idle_time in all_last_idle_time):
raise mo.Return(None)
else:
raise mo.Return(max(all_last_idle_time))
self._stored_last_idle_time = max(
[self._stored_last_idle_time] + all_last_idle_time
)
raise mo.Return(self._stored_last_idle_time)


class SessionActor(mo.Actor):
Expand Down
4 changes: 4 additions & 0 deletions mars/services/session/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import threading
import time

import pytest
import numpy as np
Expand Down Expand Up @@ -99,7 +100,10 @@ async def test_get_last_idle_time():
NodeRole.WORKER, config, address=worker_pool.external_address
)

start_time = time.time()
session_api = await SessionAPI.create(sv_pool.external_address)
assert await session_api.get_last_idle_time() < start_time

session_id = "test_session"
await session_api.create_session(session_id)
# check last idle time is not None
Expand Down