Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[scheduling] Expand slot scheduler to resource scheduler #2846

Merged
merged 56 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
09cfdfc
accurate resource management for global slot manager
chaokunyang Feb 21, 2022
74e6b47
fix tests
chaokunyang Mar 2, 2022
720f833
fix ci failures
chaokunyang Mar 7, 2022
98b2ab8
lint
chaokunyang Mar 7, 2022
52e4eac
fix tests
chaokunyang Mar 8, 2022
c0075dd
rename slot to resource
chaokunyang Mar 8, 2022
accdc4f
fix test_global_resource
chaokunyang Mar 9, 2022
90c05db
accurate resource management for global slot manager
chaokunyang Feb 21, 2022
0f1fcb7
fix tests
chaokunyang Mar 2, 2022
70b1f9c
fix ci failures
chaokunyang Mar 7, 2022
d65564e
lint
chaokunyang Mar 7, 2022
b0e7748
fix tests
chaokunyang Mar 8, 2022
abb5f54
rename slot to resource
chaokunyang Mar 8, 2022
341576c
fix test_global_resource
chaokunyang Mar 9, 2022
3cbf72f
Merge remote-tracking branch 'mars/master' into new_rm_for_slot_manager
chaokunyang Mar 10, 2022
00a2ae1
Update mars/services/_core.pyx
chaokunyang Mar 10, 2022
076805b
rename global_slot_ref to global_resource_ref
chaokunyang Mar 14, 2022
91f15fb
move _resource.pyx to outer dir
chaokunyang Mar 14, 2022
10ef8e4
Merge remote-tracking branch 'chaokun/new_rm_for_slot_manager' into n…
chaokunyang Mar 14, 2022
a4292d7
fix tests
chaokunyang Mar 15, 2022
ee2a878
Merge remote-tracking branch 'upstream/master'
zhongchun Mar 15, 2022
cd0336d
Upgrade slot scheduler to resource scheduler
zhongchun Mar 10, 2022
b814aa6
Fix gather_node_resource
zhongchun Mar 10, 2022
374e152
Fix tests
zhongchun Mar 15, 2022
96933e7
rename global_slot_ref to global_resource_ref
chaokunyang Mar 14, 2022
0951c1c
move _resource.pyx to outer dir
chaokunyang Mar 14, 2022
1adb675
fix tests
chaokunyang Mar 15, 2022
da9a89c
Fix imports
zhongchun Mar 15, 2022
a8f0500
Restore mars/services/web/ui/package-lock.json
zhongchun Mar 15, 2022
649232a
Lint
zhongchun Mar 15, 2022
3984075
Fix tests
zhongchun Mar 15, 2022
ce8875c
Fix tests
zhongchun Mar 16, 2022
c95be57
Fix test_service.py
zhongchun Mar 16, 2022
f559b62
Merge remote-tracking branch 'mars/master' into refine_autoscaling
chaokunyang Mar 16, 2022
26f5dbf
Merge remote-tracking branch 'mars/master' into refine_autoscaling
chaokunyang Mar 16, 2022
76a637a
lint
chaokunyang Mar 16, 2022
e74edfa
Merge remote-tracking branch 'chaokunyang/new_rm_for_slot_manager' in…
zhongchun Mar 16, 2022
cffcee0
Fix ray tests
zhongchun Mar 17, 2022
3add063
Merge branch 'feat-resource-scheduler'
zhongchun Mar 22, 2022
b8007f1
Fix tests
zhongchun Mar 22, 2022
89c79ca
Remove unused method and fix workerslot
zhongchun Mar 22, 2022
d6ba330
Fix global resource test
zhongchun Mar 22, 2022
af8198a
Fix worker slot
zhongchun Mar 23, 2022
e59b060
Merge remote-tracking branch 'upstream/master' into feat-resource-sch…
zhongchun Mar 23, 2022
f96d582
Fix get_slots of ThreadedServiceContext
zhongchun Mar 23, 2022
0b5ecfd
Fix test_service.py
zhongchun Mar 23, 2022
ae95697
Fix test_service.py
zhongchun Mar 24, 2022
bbeb85d
Rename num_mem_bytes to mem_bytes and lint
zhongchun Mar 24, 2022
c85007d
Remove .python-version
zhongchun Mar 24, 2022
46ca379
Rename num_mem_bytes to mem_bytes
zhongchun Mar 25, 2022
aa6a555
Add ResourceEvaluator
zhongchun Mar 25, 2022
d56957b
Fix absolute imports
zhongchun Mar 25, 2022
72966d6
Merge remote-tracking branch 'upstream/master' into feat-resource-sch…
zhongchun Mar 25, 2022
b084aa5
Fix test_no_supervisor
zhongchun Mar 25, 2022
7ed161e
Fix time_assigner
zhongchun Mar 25, 2022
545a706
Rename band_slots to band_resource of GraphAnalyzer
zhongchun Mar 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions asv_bench/benchmarks/graph_assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import mars.tensor as mt
import mars.dataframe as md
from mars.core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
from mars.resource import Resource
from mars.services.task.analyzer import GraphAnalyzer
from mars.services.task.analyzer.assigner import GraphAssigner

Expand All @@ -39,8 +40,10 @@ def setup(self):

def time_assigner(self):
start_ops = list(GraphAnalyzer._iter_start_ops(self.chunk_graph))
band_slots = {(f"worker-{i}", "numa-0"): 16 for i in range(50)}
band_resource = {
(f"worker-{i}", "numa-0"): Resource(num_cpus=16) for i in range(50)
}
current_assign = {}
assigner = GraphAssigner(self.chunk_graph, start_ops, band_slots)
assigner = GraphAssigner(self.chunk_graph, start_ops, band_resource)
assigned_result = assigner.assign(current_assign)
assert len(assigned_result) == len(start_ops)
20 changes: 10 additions & 10 deletions mars/_resource.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ cdef class Resource:
cdef readonly:
float num_cpus
float num_gpus
float num_mem_bytes
float mem_bytes

def __init__(self, float num_cpus=0, float num_gpus=0, float num_mem_bytes=0):
def __init__(self, float num_cpus=0, float num_gpus=0, float mem_bytes=0):
self.num_cpus = num_cpus
self.num_gpus = num_gpus
self.num_mem_bytes = num_mem_bytes
self.mem_bytes = mem_bytes

def __eq__(self, Resource other):
return self.num_mem_bytes == other.num_mem_bytes and \
return self.mem_bytes == other.mem_bytes and \
self.num_gpus == other.num_gpus and \
self.num_cpus == other.num_cpus

Expand All @@ -33,22 +33,22 @@ cdef class Resource:

def __le__(self, Resource other):
# memory first, then gpu, cpu last
return self.num_mem_bytes <= other.num_mem_bytes and \
return self.mem_bytes <= other.mem_bytes and \
self.num_gpus <= other.num_gpus and \
self.num_cpus <= other.num_cpus

def __add__(self, Resource other):
return Resource(num_cpus=self.num_cpus + other.num_cpus,
num_gpus=self.num_gpus + other.num_gpus,
num_mem_bytes=self.num_mem_bytes + other.num_mem_bytes)
mem_bytes=self.mem_bytes + other.mem_bytes)
def __sub__(self, Resource other):
return Resource(num_cpus=self.num_cpus - other.num_cpus,
num_gpus=self.num_gpus - other.num_gpus,
num_mem_bytes=self.num_mem_bytes - other.num_mem_bytes)
mem_bytes=self.mem_bytes - other.mem_bytes)
def __neg__(self):
return Resource(num_cpus=-self.num_cpus, num_gpus=-self.num_gpus, num_mem_bytes=-self.num_mem_bytes)
return Resource(num_cpus=-self.num_cpus, num_gpus=-self.num_gpus, mem_bytes=-self.mem_bytes)

def __repr__(self):
return f"Resource(num_cpus={self.num_cpus}, num_gpus={self.num_gpus}, num_mem_bytes={self.num_mem_bytes})"
return f"Resource(num_cpus={self.num_cpus}, num_gpus={self.num_gpus}, mem_bytes={self.mem_bytes})"

ZeroResource = Resource(num_cpus=0, num_gpus=0, num_mem_bytes=0)
ZeroResource = Resource(num_cpus=0, num_gpus=0, mem_bytes=0)
2 changes: 1 addition & 1 deletion mars/deploy/kubernetes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def start_services(self):
await start_worker(
self.pool.external_address,
self.args.supervisors,
self.band_to_slot,
self.band_to_resource,
list(self.args.load_modules),
self.config,
mark_ready=False,
Expand Down
31 changes: 21 additions & 10 deletions mars/deploy/oscar/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ... import oscar as mo
from ...core.entrypoints import init_extension_entrypoints
from ...lib.aio import get_isolation, stop_isolation
from ...resource import cpu_count, cuda_count
from ...resource import cpu_count, cuda_count, mem_total, Resource
from ...services import NodeRole
from ...typing import ClusterType, ClientType
from ..utils import get_third_party_modules_from_config
Expand All @@ -51,6 +51,7 @@ async def new_cluster_in_isolation(
address: str = "0.0.0.0",
n_worker: int = 1,
n_cpu: Union[int, str] = "auto",
mem_bytes: Union[int, str] = "auto",
cuda_devices: Union[List[int], str] = "auto",
subprocess_start_method: str = None,
backend: str = None,
Expand All @@ -65,6 +66,7 @@ async def new_cluster_in_isolation(
address,
n_worker,
n_cpu,
mem_bytes,
cuda_devices,
subprocess_start_method,
config,
Expand All @@ -79,6 +81,7 @@ async def new_cluster(
address: str = "0.0.0.0",
n_worker: int = 1,
n_cpu: Union[int, str] = "auto",
mem_bytes: Union[int, str] = "auto",
cuda_devices: Union[List[int], str] = "auto",
subprocess_start_method: str = None,
config: Union[str, Dict] = None,
Expand All @@ -91,6 +94,7 @@ async def new_cluster(
address,
n_worker=n_worker,
n_cpu=n_cpu,
mem_bytes=mem_bytes,
cuda_devices=cuda_devices,
subprocess_start_method=subprocess_start_method,
config=config,
Expand All @@ -116,6 +120,7 @@ def __init__(
address: str = "0.0.0.0",
n_worker: int = 1,
n_cpu: Union[int, str] = "auto",
mem_bytes: Union[int, str] = "auto",
cuda_devices: Union[List[int], List[List[int]], str] = "auto",
subprocess_start_method: str = None,
config: Union[str, Dict] = None,
Expand All @@ -132,6 +137,7 @@ def __init__(
self._subprocess_start_method = subprocess_start_method
self._config = config
self._n_cpu = cpu_count() if n_cpu == "auto" else n_cpu
self._mem_bytes = mem_total() if mem_bytes == "auto" else mem_bytes
self._n_supervisor_process = n_supervisor_process
if cuda_devices == "auto":
total = cuda_count()
Expand All @@ -148,19 +154,22 @@ def __init__(

self._n_worker = n_worker
self._web = web
self._bands_to_slot = bands_to_slot = []
self._bands_to_resource = bands_to_resource = []
worker_cpus = self._n_cpu // n_worker
if sum(len(devices) for devices in devices_list) == 0:
assert worker_cpus > 0, (
f"{self._n_cpu} cpus are not enough "
f"for {n_worker}, try to decrease workers."
)
mem_bytes = self._mem_bytes // n_worker
for _, devices in zip(range(n_worker), devices_list):
worker_band_to_slot = dict()
worker_band_to_slot["numa-0"] = worker_cpus
worker_band_to_resource = dict()
worker_band_to_resource["numa-0"] = Resource(
num_cpus=worker_cpus, mem_bytes=mem_bytes
)
for i in devices: # pragma: no cover
worker_band_to_slot[f"gpu-{i}"] = 1
bands_to_slot.append(worker_band_to_slot)
worker_band_to_resource[f"gpu-{i}"] = Resource(num_gpus=1)
bands_to_resource.append(worker_band_to_resource)
self._supervisor_pool = None
self._worker_pools = []

Expand Down Expand Up @@ -211,10 +220,10 @@ async def _start_worker_pools(self):
worker_modules = get_third_party_modules_from_config(
self._config, NodeRole.WORKER
)
for band_to_slot in self._bands_to_slot:
for band_to_resource in self._bands_to_resource:
worker_pool = await create_worker_actor_pool(
self._address,
band_to_slot,
band_to_resource,
modules=worker_modules,
subprocess_start_method=self._subprocess_start_method,
metrics=self._config.get("metrics", {}),
Expand All @@ -225,11 +234,13 @@ async def _start_service(self):
self._web = await start_supervisor(
self.supervisor_address, config=self._config, web=self._web
)
for worker_pool, band_to_slot in zip(self._worker_pools, self._bands_to_slot):
for worker_pool, band_to_resource in zip(
self._worker_pools, self._bands_to_resource
):
await start_worker(
worker_pool.external_address,
self.supervisor_address,
band_to_slot,
band_to_resource,
config=self._config,
)

Expand Down
16 changes: 10 additions & 6 deletions mars/deploy/oscar/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Dict, List

from ... import oscar as mo
from ...resource import cuda_count
from ...resource import cuda_count, Resource

try:
from IPython import get_ipython
Expand Down Expand Up @@ -46,7 +46,7 @@ async def create_supervisor_actor_pool(

async def create_worker_actor_pool(
address: str,
band_to_slots: Dict[str, int],
band_to_resource: Dict[str, Resource],
n_io_process: int = 1,
modules: List[str] = None,
ports: List[int] = None,
Expand All @@ -55,7 +55,10 @@ async def create_worker_actor_pool(
**kwargs,
):
# TODO: support NUMA when ready
n_process = sum(slot for slot in band_to_slots.values())
n_process = sum(
int(resource.num_cpus) or int(resource.num_gpus)
for resource in band_to_resource.values()
)
envs = []
labels = ["main"]

Expand All @@ -67,15 +70,16 @@ async def create_worker_actor_pool(
cuda_devices = [int(i) for i in env_devices.split(",")]

i_gpu = iter(sorted(cuda_devices))
for band, slot in band_to_slots.items():
for band, resource in band_to_resource.items():
if band.startswith("gpu"): # pragma: no cover
idx = str(next(i_gpu))
envs.append({"CUDA_VISIBLE_DEVICES": idx})
labels.append(f"gpu-{idx}")
else:
assert band.startswith("numa")
envs.extend([dict() for _ in range(slot)])
labels.extend([band] * slot)
num_cpus = int(resource.num_cpus)
envs.extend([dict() for _ in range(num_cpus)])
labels.extend([band] * num_cpus)

suspend_sigint = get_ipython is not None and get_ipython() is not None
return await mo.create_actor_pool(
Expand Down
30 changes: 20 additions & 10 deletions mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from ...oscar.backends.ray.pool import RayPoolState
from ...oscar.errors import ReconstructWorkerError
from ...resource import Resource
from ...services.cluster.backends.base import (
register_cluster_backend,
AbstractClusterBackend,
Expand Down Expand Up @@ -130,7 +131,7 @@ def get_cluster_state_ref(self):
class ClusterStateActor(mo.StatelessActor):
def __init__(self):
self._worker_cpu, self._worker_mem, self._config = None, None, None
self._pg_name, self._band_to_slot, self._worker_modules = None, None, None
self._pg_name, self._band_to_resource, self._worker_modules = None, None, None
self._pg_counter = itertools.count()
self._worker_count = 0
self._workers = {}
Expand All @@ -147,7 +148,9 @@ def set_config(self, worker_cpu, worker_mem, config):
config,
)
# TODO(chaokunyang) Support gpu
self._band_to_slot = {"numa-0": self._worker_cpu}
self._band_to_resource = {
"numa-0": Resource(num_cpus=self._worker_cpu, mem_bytes=self._worker_mem)
}
self._worker_modules = get_third_party_modules_from_config(
self._config, NodeRole.WORKER
)
Expand All @@ -156,11 +159,14 @@ async def request_worker(
self, worker_cpu: int = None, worker_mem: int = None, timeout: int = None
) -> Optional[str]:
worker_cpu = worker_cpu or self._worker_cpu
worker_mem = worker_mem or self._worker_mem
bundle = {
"CPU": worker_cpu,
# "memory": worker_mem or self._worker_mem
}
band_to_slot = {"numa-0": worker_cpu}
band_to_resource = {
"numa-0": Resource(num_cpus=worker_cpu, mem_bytes=worker_mem)
}
start_time = time.time()
logger.info("Start to request worker with resource %s.", bundle)
# TODO rescale ray placement group instead of creating new placement group
Expand Down Expand Up @@ -193,7 +199,7 @@ async def request_worker(
)
worker_address = process_placement_to_address(pg_name, 0, 0)
worker_pool = await self.create_worker(worker_address)
await self.start_worker(worker_address, band_to_slot=band_to_slot)
await self.start_worker(worker_address, band_to_resource=band_to_resource)
logger.info(
"Request worker %s succeeds in %.4f seconds",
worker_address,
Expand All @@ -206,7 +212,7 @@ async def create_worker(self, worker_address):
start_time = time.time()
worker_pool = await create_worker_actor_pool(
worker_address,
self._band_to_slot,
self._band_to_resource,
modules=self._worker_modules,
metrics=self._config.get("metrics", {}),
)
Expand All @@ -217,12 +223,12 @@ async def create_worker(self, worker_address):
)
return worker_pool

async def start_worker(self, worker_address, band_to_slot=None):
async def start_worker(self, worker_address, band_to_resource=None):
self._worker_count += 1
start_time = time.time()
band_to_slot = band_to_slot or self._band_to_slot
band_to_resource = band_to_resource or self._band_to_resource
await start_worker(
worker_address, self.address, band_to_slot, config=self._config
worker_address, self.address, band_to_resource, config=self._config
)
worker_pool = ray.get_actor(worker_address)
await worker_pool.mark_service_ready.remote()
Expand Down Expand Up @@ -290,7 +296,7 @@ async def _reconstruct_worker():

start_time = time.time()
await start_worker(
address, self.address, self._band_to_slot, config=self._config
address, self.address, self._band_to_resource, config=self._config
)
await actor.mark_service_ready.remote()
logger.info(
Expand Down Expand Up @@ -514,7 +520,11 @@ async def start(self):
asyncio.create_task(
create_worker_actor_pool(
addr,
{"numa-0": self._worker_cpu},
{
"numa-0": Resource(
num_cpus=self._worker_cpu, mem_bytes=self._worker_mem
)
},
modules=get_third_party_modules_from_config(
self._config, NodeRole.WORKER
),
Expand Down
7 changes: 4 additions & 3 deletions mars/deploy/oscar/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os
from typing import List, Dict, Union

from ...resource import Resource
from ...services import start_services, stop_services, NodeRole
from ..utils import load_service_config_file

Expand Down Expand Up @@ -75,7 +76,7 @@ async def stop_supervisor(address: str, config: Dict = None):
async def start_worker(
address: str,
lookup_address: str,
band_to_slots: Dict[str, int],
band_to_resource: Dict[str, Resource],
modules: Union[List, str, None] = None,
config: Dict = None,
mark_ready: bool = True,
Expand All @@ -87,9 +88,9 @@ async def start_worker(
if backend == "fixed" and config["cluster"].get("lookup_address") is None:
config["cluster"]["lookup_address"] = lookup_address
if config["cluster"].get("resource") is None:
config["cluster"]["resource"] = band_to_slots
config["cluster"]["resource"] = band_to_resource
if any(
band_name.startswith("gpu-") for band_name in band_to_slots
band_name.startswith("gpu-") for band_name in band_to_resource
): # pragma: no cover
if "cuda" not in config["storage"]["backends"]:
config["storage"]["backends"].append("cuda")
Expand Down
4 changes: 2 additions & 2 deletions mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1256,10 +1256,10 @@ async def fetch_tileable_op_logs(
async def get_total_n_cpu(self):
all_bands = await self._cluster_api.get_all_bands()
n_cpu = 0
for band, size in all_bands.items():
for band, resource in all_bands.items():
_, band_name = band
if band_name.startswith("numa-"):
n_cpu += size
n_cpu += resource.num_cpus
return n_cpu

async def get_cluster_versions(self) -> List[str]:
Expand Down
Loading