Skip to content

Commit

Permalink
[BACKPORT] Use OS-designated ports instead of random ports to create …
Browse files Browse the repository at this point in the history
…sub pools (#3053) (#3087)
  • Loading branch information
wjsi authored May 25, 2022
1 parent dfd0bdb commit 4852097
Show file tree
Hide file tree
Showing 59 changed files with 421 additions and 241 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ include mars/services/web/static/*
global-exclude .DS_Store
include versioneer.py
include mars/_version.py
global-exclude conftest.py
2 changes: 1 addition & 1 deletion benchmarks/tpch/gen_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import shutil
import subprocess
from multiprocessing import Pool, set_start_method
import pandas as pd
import pyarrow.parquet as pq


Expand All @@ -44,6 +43,7 @@
# Change location of tpch-dbgen if not in same place as this script
tpch_dbgen_location = "./tpch-dbgen"


# First element is the table single character short-hand understood by dbgen
# Second element is the number of pieces we want the parquet dataset to have for that table
# Third element is the function that reads generated CSV to a pandas dataframe
Expand Down
5 changes: 2 additions & 3 deletions benchmarks/tpch/run_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ def q03(lineitem, orders, customer):

@tpc_query
def q04(lineitem, orders):
t1 = time.time()
date1 = md.Timestamp("1993-11-01")
date2 = md.Timestamp("1993-08-01")
lsel = lineitem.L_COMMITDATE < lineitem.L_RECEIPTDATE
Expand Down Expand Up @@ -617,7 +616,7 @@ def g2(x):
def q13(customer, orders):
customer_filtered = customer.loc[:, ["C_CUSTKEY"]]
orders_filtered = orders[
~orders["O_COMMENT"].str.contains("special[\S|\s]*requests")
~orders["O_COMMENT"].str.contains(r"special[\S|\s]*requests")
]
orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY", "O_CUSTKEY"]]
c_o_merged = customer_filtered.merge(
Expand Down Expand Up @@ -696,7 +695,7 @@ def q16(part, partsupp, supplier):
)
total = total.loc[:, ["P_BRAND", "P_TYPE", "P_SIZE", "PS_SUPPKEY"]]
supplier_filtered = supplier[
supplier["S_COMMENT"].str.contains("Customer(\S|\s)*Complaints")
supplier["S_COMMENT"].str.contains(r"Customer(\S|\s)*Complaints")
]
supplier_filtered = supplier_filtered.loc[:, ["S_SUPPKEY"]].drop_duplicates()
# left merge to select only PS_SUPPKEY values not in supplier_filtered
Expand Down
36 changes: 30 additions & 6 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from mars.config import option_context
from mars.core.mode import is_kernel_mode, is_build_mode
from mars.lib.aio.lru import clear_all_alru_caches
from mars.oscar.backends.router import Router
from mars.oscar.backends.ray.communication import RayServer
from mars.serialization.ray import register_ray_serializers, unregister_ray_serializers
Expand All @@ -30,6 +31,21 @@
MARS_CI_BACKEND = os.environ.get("MARS_CI_BACKEND", "mars")


@pytest.fixture(autouse=True)
def auto_cleanup(request):
request.addfinalizer(clear_all_alru_caches)


@pytest.fixture(scope="module", autouse=True)
def check_router_cleaned(request):
def route_checker():
if Router.get_instance() is not None:
assert len(Router.get_instance()._mapping) == 0
assert len(Router.get_instance()._local_mapping) == 0

request.addfinalizer(route_checker)


@pytest.fixture(scope="module")
def ray_start_regular_shared(request): # pragma: no cover
yield from _ray_start_regular(request)
Expand All @@ -49,6 +65,7 @@ def ray_start_regular_shared2(request): # pragma: no cover
yield ray.init(num_cpus=num_cpus, job_config=job_config)
finally:
ray.shutdown()
Router.set_instance(None)
os.environ.pop("RAY_kill_idle_workers_interval_ms", None)


Expand Down Expand Up @@ -132,10 +149,11 @@ def stop_ray(request): # pragma: no cover
yield
if ray.is_initialized():
ray.shutdown()
Router.set_instance(None)


@pytest.fixture
async def ray_create_mars_cluster(request):
async def ray_create_mars_cluster(request, check_router_cleaned):
from mars.deploy.oscar.ray import new_cluster, _load_config

ray_config = _load_config()
Expand All @@ -153,12 +171,15 @@ async def ray_create_mars_cluster(request):
worker_mem=worker_mem,
config=ray_config,
)
async with client:
yield client
try:
async with client:
yield client
finally:
Router.set_instance(None)


@pytest.fixture(scope="module")
def _new_test_session():
def _new_test_session(check_router_cleaned):
from .deploy.oscar.tests.session import new_test_session

sess = new_test_session(
Expand All @@ -173,10 +194,11 @@ def _new_test_session():
yield sess
finally:
sess.stop_server(isolation=False)
Router.set_instance(None)


@pytest.fixture(scope="module")
def _new_integrated_test_session():
def _new_integrated_test_session(check_router_cleaned):
from .deploy.oscar.tests.session import new_test_session

sess = new_test_session(
Expand All @@ -194,6 +216,7 @@ def _new_integrated_test_session():
try:
sess.stop_server(isolation=False)
except concurrent.futures.TimeoutError:
Router.set_instance(None)
subprocesses = psutil.Process().children(recursive=True)
for proc in subprocesses:
proc.terminate()
Expand All @@ -209,7 +232,7 @@ def _new_integrated_test_session():


@pytest.fixture(scope="module")
def _new_gpu_test_session(): # pragma: no cover
def _new_gpu_test_session(check_router_cleaned): # pragma: no cover
from .deploy.oscar.tests.session import new_test_session
from .resource import cuda_count

Expand All @@ -230,6 +253,7 @@ def _new_gpu_test_session(): # pragma: no cover
yield sess
finally:
sess.stop_server(isolation=False)
Router.set_instance(None)


@pytest.fixture
Expand Down
3 changes: 3 additions & 0 deletions mars/deploy/oscar/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ... import oscar as mo
from ...core.entrypoints import init_extension_entrypoints
from ...lib.aio import get_isolation, stop_isolation
from ...oscar.backends.router import Router
from ...resource import cpu_count, cuda_count, mem_total
from ...services import NodeRole
from ...services.task.execution.api import ExecutionConfig
Expand Down Expand Up @@ -123,6 +124,7 @@ async def stop_cluster(cluster: ClusterType):
isolation = get_isolation()
coro = cluster.stop()
await asyncio.wrap_future(asyncio.run_coroutine_threadsafe(coro, isolation.loop))
Router.set_instance(None)


class LocalCluster:
Expand Down Expand Up @@ -276,6 +278,7 @@ async def stop(self):
await self._supervisor_pool.stop()
AbstractSession.reset_default()
self._exiting_check_task.cancel()
Router.set_instance(None)


class LocalClient:
Expand Down
2 changes: 2 additions & 0 deletions mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
process_address_to_placement,
)
from ...oscar.backends.ray.pool import RayPoolState
from ...oscar.backends.router import Router
from ...oscar.errors import ReconstructWorkerError
from ...resource import Resource
from ...services.cluster.backends.base import (
Expand Down Expand Up @@ -569,6 +570,7 @@ async def stop(self):
finally:
AbstractSession.reset_default()
RayActorDriver.stop_cluster()
Router.set_instance(None)
self._stopped = True


Expand Down
8 changes: 4 additions & 4 deletions mars/deploy/oscar/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,11 +631,11 @@ def setup_session():
session = new_session(n_cpu=2, use_uvloop=False)
assert session.get_web_endpoint() is not None

with session:
with option_context({"show_progress": False}):
try:
with session, option_context({"show_progress": False}):
yield session

session.stop_server()
finally:
session.stop_server()


def test_decref(setup_session):
Expand Down
14 changes: 0 additions & 14 deletions mars/lib/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,3 @@
from ._threads import to_thread

asyncio.to_thread = to_thread


if sys.version_info[:2] < (3, 7):
# patch run and get_running_loop etc for python 3.6
from ._runners import get_running_loop, run

asyncio.run = run
asyncio.get_running_loop = get_running_loop
asyncio.create_task = asyncio.ensure_future

# patch async generator
from async_generator import asynccontextmanager

contextlib.asynccontextmanager = asynccontextmanager
14 changes: 13 additions & 1 deletion mars/lib/aio/lru.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,23 @@
# THE SOFTWARE.

import asyncio
import os
import weakref
from collections import OrderedDict
from functools import _CacheInfo, _make_key, partial, wraps


__version__ = "1.0.2"

__all__ = ("alru_cache",)
__all__ = ("alru_cache", "clear_all_alru_caches")

_is_ci = (os.environ.get("CI") or "0").lower() in ("1", "true")
_all_wrapped = weakref.WeakSet()


def clear_all_alru_caches():
for wrapped in _all_wrapped:
wrapped.cache_clear()


def unpartial(fn):
Expand Down Expand Up @@ -206,6 +216,8 @@ async def wrapped(*fn_args, **fn_kwargs):
wrapped.close = partial(_close, wrapped)
wrapped.open = partial(_open, wrapped)

if _is_ci:
_all_wrapped.add(wrapped)
return wrapped

if fn is None:
Expand Down
5 changes: 5 additions & 0 deletions mars/oscar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TypeVar, Union

# import aio to ensure patch enabled for Python 3.6
from ..lib import aio

Expand Down Expand Up @@ -49,3 +51,6 @@
from .backends import mars, ray, test

del mars, ray, test

_T = TypeVar("_T")
ActorRefType = Union[ActorRef, _T]
4 changes: 2 additions & 2 deletions mars/oscar/backends/communication/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,10 @@ async def connect(
server = DummyServer.get_instance(dest_address)
if server is None: # pragma: no cover
raise RuntimeError(
"DummyServer needs to be created first before DummyClient"
f"DummyServer {dest_address} needs to be created first before DummyClient"
)
if server.stopped: # pragma: no cover
raise ConnectionError("Dummy server closed")
raise ConnectionError(f"Dummy server {dest_address} closed")

q1, q2 = asyncio.Queue(), asyncio.Queue()
closed = asyncio.Event()
Expand Down
5 changes: 5 additions & 0 deletions mars/oscar/backends/communication/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,15 @@ async def handle_connection(reader, writer):
reader, writer, local_address=server.address
)

port = port if port != 0 else None
aio_server = await asyncio.start_server(
handle_connection, host=host, port=port, **config
)

# get port of the socket if not specified
if not port:
port = aio_server.sockets[0].getsockname()[1]

if _is_windows:
for sock in aio_server.sockets:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
Expand Down
24 changes: 23 additions & 1 deletion mars/oscar/backends/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ def add_pool_conf(
"logging_conf": logging_conf,
"kwargs": kwargs or {},
}

mapping: Dict = self._conf["mapping"]
for addr in external_address:
mapping: Dict = self._conf["mapping"]
mapping[addr] = internal_address

def get_pool_config(self, process_index: int):
Expand All @@ -81,6 +82,27 @@ def get_process_index(self, external_address: str):
f"Cannot get process_index for {external_address}"
) # pragma: no cover

def reset_pool_external_address(
self,
process_index: int,
external_address: Union[str, List[str]],
):
if not isinstance(external_address, list):
external_address = [external_address]
cur_pool_config = self._conf["pools"][process_index]
internal_address = cur_pool_config["internal_address"]

mapping: Dict = self._conf["mapping"]
for addr in cur_pool_config["external_address"]:
if internal_address == addr:
# internal address may be the same as external address in Windows
internal_address = external_address[0]
mapping.pop(addr, None)

cur_pool_config["external_address"] = external_address
for addr in external_address:
mapping[addr] = internal_address

def get_external_addresses(self, label=None) -> List[str]:
result = []
for c in self._conf["pools"].values():
Expand Down
Loading

0 comments on commit 4852097

Please sign in to comment.