Skip to content

Commit

Permalink
Return None when no supervisors available
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Mar 23, 2022
1 parent 96af4fa commit ff5c180
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 7 deletions.
10 changes: 8 additions & 2 deletions mars/services/cluster/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,15 @@ async def get_supervisor_refs(self, uids: List[str]) -> List[mo.ActorRef]:
references of the actors
"""
addrs = await self.get_supervisors_by_keys(uids)
return await asyncio.gather(
*[mo.actor_ref(uid, address=addr) for addr, uid in zip(addrs, uids)]
gather_refs = await asyncio.gather(
*[
mo.actor_ref(uid, address=addr)
for addr, uid in zip(addrs, uids)
if addr is not None
]
)
ref_iter = iter(gather_refs)
return [next(ref_iter) if addr is not None else None for addr in addrs]

async def watch_supervisor_refs(self, uids: List[str]):
async for addrs in self.watch_supervisors_by_keys(uids):
Expand Down
8 changes: 5 additions & 3 deletions mars/services/cluster/backends/fixed.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import AsyncGenerator, List, Optional
from typing import AsyncGenerator, List, Optional, Union

from ..core import NodeRole
from .base import AbstractClusterBackend, register_cluster_backend
Expand All @@ -22,8 +22,10 @@
class FixedClusterBackend(AbstractClusterBackend):
name = "fixed"

def __init__(self, lookup_address: str):
self._supervisors = [n.strip() for n in lookup_address.split(",")]
def __init__(self, lookup_address: Union[List[str], str]):
if isinstance(lookup_address, str):
lookup_address = lookup_address.split(",")
self._supervisors = [n.strip() for n in lookup_address]

@classmethod
async def create(
Expand Down
2 changes: 1 addition & 1 deletion mars/services/cluster/locator.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def get_supervisors(self, filter_ready: bool = True):

@mo.extensible
def get_supervisor(self, key: str, size=1):
if self._supervisors is None: # pragma: no cover
if not self._supervisors:
return None
elif size == 1:
return self._hash_ring.get_node(key)
Expand Down
30 changes: 29 additions & 1 deletion mars/services/cluster/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from ....utils import get_next_port
from ... import NodeRole
from ...web.supervisor import WebSupervisorService
from ..api import MockClusterAPI, WebClusterAPI
from ..api import ClusterAPI, MockClusterAPI, WebClusterAPI
from ..api.web import web_handlers
from ..core import NodeStatus

Expand Down Expand Up @@ -137,3 +137,31 @@ async def test_web_api(actor_pool):
await asyncio.wait_for(wait_async_gen(web_api.watch_all_bands()), timeout=0.1)

await MockClusterAPI.cleanup(pool_addr)


@pytest.mark.asyncio
async def test_none_supervisor(actor_pool):
pool_addr = actor_pool.external_address

from ..supervisor.locator import SupervisorPeerLocatorActor
from ..uploader import NodeInfoUploaderActor

await mo.create_actor(
SupervisorPeerLocatorActor,
"fixed",
[],
uid=SupervisorPeerLocatorActor.default_uid(),
address=pool_addr,
)
await mo.create_actor(
NodeInfoUploaderActor,
NodeRole.WORKER,
interval=1,
band_to_slots=None,
use_gpu=False,
uid=NodeInfoUploaderActor.default_uid(),
address=pool_addr,
)
api = await ClusterAPI.create(address=pool_addr)
[ref] = await api.get_supervisor_refs(["KEY"])
assert ref is None
4 changes: 4 additions & 0 deletions mars/services/scheduling/worker/workerslot.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ async def upload_slot_usages(self, periodical: bool = False):
delays = []
slot_infos = []
global_resource_ref = await self._get_global_resource_ref()

if global_resource_ref is None and periodical: # pragma: no cover
return

for slot_id, proc in self._slot_to_proc.items():
if slot_id not in self._slot_to_session_stid:
continue
Expand Down

0 comments on commit ff5c180

Please sign in to comment.