From 67b0e428c76b966cb5715b14441ec175bdc6bcac Mon Sep 17 00:00:00 2001 From: loopyme Date: Wed, 4 Aug 2021 20:38:36 +0800 Subject: [PATCH 01/15] Add experimental dask on mars support --- mars/experimental/__init__.py | 13 +++++ mars/experimental/dask/__init__.py | 17 ++++++ mars/experimental/dask/converter.py | 50 ++++++++++++++++++ mars/experimental/dask/scheduler.py | 82 +++++++++++++++++++++++++++++ mars/experimental/dask/utils.py | 69 ++++++++++++++++++++++++ 5 files changed, 231 insertions(+) create mode 100644 mars/experimental/__init__.py create mode 100644 mars/experimental/dask/__init__.py create mode 100644 mars/experimental/dask/converter.py create mode 100644 mars/experimental/dask/scheduler.py create mode 100644 mars/experimental/dask/utils.py diff --git a/mars/experimental/__init__.py b/mars/experimental/__init__.py new file mode 100644 index 0000000000..c71e83c08e --- /dev/null +++ b/mars/experimental/__init__.py @@ -0,0 +1,13 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/mars/experimental/dask/__init__.py b/mars/experimental/dask/__init__.py new file mode 100644 index 0000000000..bbf51db5e6 --- /dev/null +++ b/mars/experimental/dask/__init__.py @@ -0,0 +1,17 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# noinspection PyUnresolvedReferences +from .converter import convert_dask_collection +from .scheduler import mars_scheduler diff --git a/mars/experimental/dask/converter.py b/mars/experimental/dask/converter.py new file mode 100644 index 0000000000..df73c82b18 --- /dev/null +++ b/mars/experimental/dask/converter.py @@ -0,0 +1,50 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dask import is_dask_collection, optimize + +from .scheduler import mars_dask_get +from .utils import reduce + + +def convert_dask_collection(dc): + """ + Convert dask collection object into mars.core.Object via remote API + + Parameters + ---------- + dc: dask collection + Dask collection object to be converted. + + Returns + ------- + Object + Mars Object. + """ + if not is_dask_collection(dc): + raise ValueError(f"'{type(dc).__name__}' object is not a valid dask collection") + + dc.__dask_graph__().validate() + dsk = optimize(dc)[0].__dask_graph__() + + first_key = next(iter(dsk.keys())) + if isinstance(first_key, str): + key = [first_key] + elif isinstance(first_key, tuple): + key = sorted([i for i in dsk.keys() if i[0] == first_key[0]], key=lambda x: x[1]) + else: + raise ValueError( + f"Dask collection object seems be broken, with unexpected key type:'{type(first_key).__name__}'") + + return reduce(mars_dask_get(dsk, [key])) diff --git a/mars/experimental/dask/scheduler.py b/mars/experimental/dask/scheduler.py new file mode 100644 index 0000000000..46b2417d09 --- /dev/null +++ b/mars/experimental/dask/scheduler.py @@ -0,0 +1,82 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Tuple + +from dask.core import istask, ishashable + +from .utils import reduce +from ...remote import spawn + + +def mars_scheduler(dsk: dict, keys: List[List[str]]): + """ + A Dask-Mars scheduler. This scheduler is intended to be compatible + with existing dask user interface, no callbacks are implemented. + + Parameters + ---------- + dsk: Dict + Dask graph, represented as a task DAG dictionary. + keys: List[List[str]] + 2d-list of Dask graph keys whose values we wish to compute and return. + + Returns + ------- + Object + Computed values corresponding to the provided keys. + """ + + return [[reduce(mars_dask_get(dsk, keys)).execute().fetch()]] + + +def mars_dask_get(dsk: dict, keys: List[List]): + """ + A Dask-Mars convert function. This function will send the dask graph layers + to Mars Remote API, generating mars objects correspond to the provided keys. + + Parameters + ---------- + dsk: Dict + Dask graph, represented as a task DAG dictionary. + keys: List[List[str]] + 2d-list of Dask graph keys whose values we wish to compute and return. + + Returns + ------- + Object + Spawned mars objects corresponding to the provided keys. + """ + + def _get_arg(a): + # if arg contains layer index or callable objs, handle it + if ishashable(a) and a in dsk.keys(): + while ishashable(a) and a in dsk.keys(): + a = dsk[a] + return _execute_task(a) + elif not isinstance(a, str) and hasattr(a, "__getitem__"): + if istask(a): # TODO:Handle `SubgraphCallable`, which may contains dsk in it + return spawn(a[0], args=tuple(_get_arg(i) for i in a[1:])) + elif isinstance(a, dict): + return {k: _get_arg(v) for k, v in a.items()} + elif isinstance(a, List) or isinstance(a, Tuple): + return type(a)(_get_arg(i) for i in a) + return a + + def _execute_task(task: tuple): + if not istask(task): + return _get_arg(task) + return spawn(task[0], args=tuple(_get_arg(a) for a in task[1:])) + + return [[_execute_task(dsk[k]) for k in keys_d] for keys_d in keys] diff --git a/mars/experimental/dask/utils.py b/mars/experimental/dask/utils.py new file mode 100644 index 0000000000..66ab4a2312 --- /dev/null +++ b/mars/experimental/dask/utils.py @@ -0,0 +1,69 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List + +from dask import is_dask_collection +from dask.array.core import _concatenate2 as array_concat +from dask.dataframe import concat as df_concat +from dask.utils import is_arraylike, is_dataframe_like, is_series_like, is_index_like + +from ...remote import spawn + + +def concat(objs: List): + """ + Concat the results of partitioned dask task executions. This function guess the + types of resulting list, then calls the corresponding native dask concat functions. + + Parameters + ---------- + objs: List + List of the partitioned dask task execution results, which will be concat. + + Returns + ------- + obj: + The concat result + + """ + if is_arraylike(objs[0]): + res = array_concat(objs, axes=[0]) # TODO: Add concat with args support + elif any((is_dataframe_like(objs[0]), is_series_like(objs[0]), is_index_like(objs[0]))): + res = df_concat(objs) + else: + res = objs + while isinstance(res, List): + res = res[0] + return res.compute() if is_dask_collection(res) else res + + +def reduce(objs: List[List]): + """ + Spawn a concat task for 2d-list objects + + Parameters + ---------- + objs: List + 2d-list of the partitioned dask task execution results, which will be concat. + + Returns + ------- + obj: + The spawning concat task. + """ + return spawn( + concat, + args=([spawn(concat, args=(objs_d,)) for objs_d in objs],), + ) From 0557a3ff3aa7df90514f73c4fae1a4fb88124d10 Mon Sep 17 00:00:00 2001 From: loopyme Date: Wed, 4 Aug 2021 20:38:51 +0800 Subject: [PATCH 02/15] Add tests for dask on mars --- mars/experimental/dask/tests/__init__.py | 13 +++ mars/experimental/dask/tests/test_dask.py | 108 ++++++++++++++++++++++ 2 files changed, 121 insertions(+) create mode 100644 mars/experimental/dask/tests/__init__.py create mode 100644 mars/experimental/dask/tests/test_dask.py diff --git a/mars/experimental/dask/tests/__init__.py b/mars/experimental/dask/tests/__init__.py new file mode 100644 index 0000000000..7f34198853 --- /dev/null +++ b/mars/experimental/dask/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/mars/experimental/dask/tests/test_dask.py b/mars/experimental/dask/tests/test_dask.py new file mode 100644 index 0000000000..f3773b6cc1 --- /dev/null +++ b/mars/experimental/dask/tests/test_dask.py @@ -0,0 +1,108 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from mars.experimental.dask import convert_dask_collection, mars_scheduler + + +def test_delayed(): + from dask import delayed + import numpy as np + + def calc_chunk(n: int, i: int): + rs = np.random.RandomState(i) + a = rs.uniform(-1, 1, size=(n, 2)) + d = np.linalg.norm(a, axis=1) + return (d < 1).sum() + + def calc_pi(fs, N): + return sum(fs) * 4 / N + + N = 200_000_000 + n = 10_000_000 + + fs = [delayed(calc_chunk)(n, i) for i in range(N // n)] + pi = delayed(calc_pi)(fs, N) + + dask_res = pi.compute() + assert dask_res == pi.compute(scheduler=mars_scheduler) + assert dask_res == convert_dask_collection(pi).execute().fetch() + + +def test_partitioned_dataframe(): + import numpy as np + import pandas as pd + from dask import dataframe as dd + from pandas._testing import assert_frame_equal + + data = np.random.randn(10000, 100) + df = dd.from_pandas( + pd.DataFrame(data, columns=[f"col{i}" for i in range(100)]), npartitions=4 + ) + df["col0"] = df["col0"] + df["col1"] / 2 + col2_mean = df["col2"].mean() + df = df[df["col2"] > col2_mean] + + dask_res = df.compute() + assert_frame_equal(dask_res, df.compute(scheduler=mars_scheduler), check_index_type=False) + assert_frame_equal(dask_res, convert_dask_collection(df).execute().fetch(), check_index_type=False) + + +def test_unpartitioned_dataframe(): + from dask import dataframe as dd + from pandas._testing import assert_frame_equal + import pandas as pd + from sklearn.datasets import load_boston + + boston = load_boston() + pd.DataFrame(boston.data, columns=boston['feature_names']).to_csv("./boston_housing_data.csv") + + df = dd.read_csv(r"./boston_housing_data.csv") + df["CRIM"] = df["CRIM"] / 2 + + dask_res = df.compute() + assert_frame_equal(dask_res, df.compute(scheduler=mars_scheduler)) + assert_frame_equal(dask_res, convert_dask_collection(df).execute().fetch()) + + +def test_array(): + import dask.array as da + from numpy.core.numeric import array_equal + + x = da.random.random((10000, 10000), chunks=(1000, 1000)) + y = x + x.T + z = y[::2, 5000:].mean(axis=1) + + dask_res = z.compute() + assert array_equal(dask_res, z.compute(scheduler=mars_scheduler)) + assert array_equal(dask_res, convert_dask_collection(z).execute().fetch()) + + +def test_bag(): + import dask + + b = dask.datasets.make_people() # Make records of people + result = ( + b.filter(lambda record: record["age"] > 30) + .map(lambda record: record["occupation"]) + .frequencies(sort=True) + .topk(10, key=1) + ) + + dask_res = result.compute() + assert dask_res == result.compute(scheduler=mars_scheduler) + assert dask_res == list( + convert_dask_collection(result).execute().fetch() + ) # TODO: dask-bag computation will return weired tuple, which we don't know why From a259bb6e06b1bdcc4a03cf7ec4f74513ec52df3a Mon Sep 17 00:00:00 2001 From: loopyme Date: Sat, 7 Aug 2021 20:17:02 +0800 Subject: [PATCH 03/15] Rename `experimental` to `contrib` --- mars/{experimental => contrib}/__init__.py | 0 mars/{experimental => contrib}/dask/__init__.py | 0 mars/{experimental => contrib}/dask/converter.py | 0 mars/{experimental => contrib}/dask/scheduler.py | 0 mars/{experimental => contrib}/dask/tests/__init__.py | 0 mars/{experimental => contrib}/dask/tests/test_dask.py | 0 mars/{experimental => contrib}/dask/utils.py | 0 7 files changed, 0 insertions(+), 0 deletions(-) rename mars/{experimental => contrib}/__init__.py (100%) rename mars/{experimental => contrib}/dask/__init__.py (100%) rename mars/{experimental => contrib}/dask/converter.py (100%) rename mars/{experimental => contrib}/dask/scheduler.py (100%) rename mars/{experimental => contrib}/dask/tests/__init__.py (100%) rename mars/{experimental => contrib}/dask/tests/test_dask.py (100%) rename mars/{experimental => contrib}/dask/utils.py (100%) diff --git a/mars/experimental/__init__.py b/mars/contrib/__init__.py similarity index 100% rename from mars/experimental/__init__.py rename to mars/contrib/__init__.py diff --git a/mars/experimental/dask/__init__.py b/mars/contrib/dask/__init__.py similarity index 100% rename from mars/experimental/dask/__init__.py rename to mars/contrib/dask/__init__.py diff --git a/mars/experimental/dask/converter.py b/mars/contrib/dask/converter.py similarity index 100% rename from mars/experimental/dask/converter.py rename to mars/contrib/dask/converter.py diff --git a/mars/experimental/dask/scheduler.py b/mars/contrib/dask/scheduler.py similarity index 100% rename from mars/experimental/dask/scheduler.py rename to mars/contrib/dask/scheduler.py diff --git a/mars/experimental/dask/tests/__init__.py b/mars/contrib/dask/tests/__init__.py similarity index 100% rename from mars/experimental/dask/tests/__init__.py rename to mars/contrib/dask/tests/__init__.py diff --git a/mars/experimental/dask/tests/test_dask.py b/mars/contrib/dask/tests/test_dask.py similarity index 100% rename from mars/experimental/dask/tests/test_dask.py rename to mars/contrib/dask/tests/test_dask.py diff --git a/mars/experimental/dask/utils.py b/mars/contrib/dask/utils.py similarity index 100% rename from mars/experimental/dask/utils.py rename to mars/contrib/dask/utils.py From 85f8b5c50a102883ccc093d782b4bf2121626c2c Mon Sep 17 00:00:00 2001 From: loopyme Date: Sat, 7 Aug 2021 21:30:19 +0800 Subject: [PATCH 04/15] Add a separate `run-dask` CI test --- .github/workflows/platform-ci.yml | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/.github/workflows/platform-ci.yml b/.github/workflows/platform-ci.yml index c968f18668..a5c06f1863 100644 --- a/.github/workflows/platform-ci.yml +++ b/.github/workflows/platform-ci.yml @@ -9,7 +9,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - python-version: [3.8-kubernetes, 3.8-hadoop, 3.8-ray, 3.8-vineyard] + python-version: [3.8-kubernetes, 3.8-hadoop, 3.8-ray, 3.8-vineyard, 3.8-dask] include: - { os: ubuntu-latest, python-version: 3.8-kubernetes, no-common-tests: 1, no-deploy: 1, with-kubernetes: "with Kubernetes" } @@ -19,6 +19,8 @@ jobs: no-deploy: 1, with-vineyard: "with vineyard" } - { os: ubuntu-latest, python-version: 3.8-ray, no-common-tests: 1, no-deploy: 1, with-ray: "with ray" } + - { os: ubuntu-latest, python-version: 3.8-dask, no-common-tests: 1, + no-deploy: 1, run-dask: "run dask" } steps: - name: Check out code @@ -40,6 +42,7 @@ jobs: WITH_KUBERNETES: ${{ matrix.with-kubernetes }} WITH_VINEYARD: ${{ matrix.with-vineyard }} WITH_RAY: ${{ matrix.with-ray }} + RUN_DASK: ${{ matrix.run-dask }} NO_COMMON_TESTS: ${{ matrix.no-common-tests }} shell: bash run: | @@ -88,6 +91,9 @@ jobs: # remove ray version after https://github.com/ray-project/ray/issues/16802 got fixed pip install ray[default]==1.3.0 fi + if [ -n "$RUN_DASK" ]; then + pip install dask[complete] mimesis sklearn + fi fi conda list -n test @@ -98,6 +104,7 @@ jobs: WITH_CYTHON: ${{ matrix.with-cython }} WITH_VINEYARD: ${{ matrix.with-vineyard }} WITH_RAY: ${{ matrix.with-ray }} + RUN_DASK: ${{ matrix.run-dask }} NO_COMMON_TESTS: ${{ matrix.no-common-tests }} NUMPY_EXPERIMENTAL_ARRAY_FUNCTION: 1 CHANGE_MINIKUBE_NONE_USER: true @@ -127,6 +134,10 @@ jobs: pytest $PYTEST_CONFIG --timeout=120 -m ray coverage report fi + if [ -n "$RUN_DASK" ]; then + pytest $PYTEST_CONFIG mars/contrib/dask/tests/test_dask.py + coverage report + fi coverage xml - name: Stop vineyard runtime From b754f1c563e126bf9a2acfe00c7c39d8e707e580 Mon Sep 17 00:00:00 2001 From: loopyme Date: Sat, 7 Aug 2021 23:54:11 +0800 Subject: [PATCH 05/15] Fix import problems in contrib/dask --- mars/contrib/dask/__init__.py | 15 +++++++++++++-- mars/contrib/dask/tests/test_dask.py | 4 +--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/mars/contrib/dask/__init__.py b/mars/contrib/dask/__init__.py index bbf51db5e6..c313dd9d87 100644 --- a/mars/contrib/dask/__init__.py +++ b/mars/contrib/dask/__init__.py @@ -13,5 +13,16 @@ # limitations under the License. # noinspection PyUnresolvedReferences -from .converter import convert_dask_collection -from .scheduler import mars_scheduler + + +def raise_dask_required_error(): + raise ImportError('you need to have dask installed for this to work') + + +try: + import dask +except ImportError: + convert_dask_collection = mars_scheduler = raise_dask_required_error +else: + from .converter import convert_dask_collection + from .scheduler import mars_scheduler diff --git a/mars/contrib/dask/tests/test_dask.py b/mars/contrib/dask/tests/test_dask.py index f3773b6cc1..6449af112e 100644 --- a/mars/contrib/dask/tests/test_dask.py +++ b/mars/contrib/dask/tests/test_dask.py @@ -12,9 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pytest - -from mars.experimental.dask import convert_dask_collection, mars_scheduler +from mars.contrib.dask import convert_dask_collection, mars_scheduler def test_delayed(): From a9e799f7749388934d5f3c94e9e0ba35a7438e7e Mon Sep 17 00:00:00 2001 From: loopyme Date: Mon, 9 Aug 2021 20:16:59 +0800 Subject: [PATCH 06/15] Fix typo --- mars/contrib/dask/tests/test_dask.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mars/contrib/dask/tests/test_dask.py b/mars/contrib/dask/tests/test_dask.py index 6449af112e..4224c84c34 100644 --- a/mars/contrib/dask/tests/test_dask.py +++ b/mars/contrib/dask/tests/test_dask.py @@ -103,4 +103,4 @@ def test_bag(): assert dask_res == result.compute(scheduler=mars_scheduler) assert dask_res == list( convert_dask_collection(result).execute().fetch() - ) # TODO: dask-bag computation will return weired tuple, which we don't know why + ) # TODO: dask-bag computation will return weird tuple, which we don't know why From 0c59d77ad3808d1658bc477a84caf3c6adc963ff Mon Sep 17 00:00:00 2001 From: loopyme Date: Mon, 9 Aug 2021 20:42:32 +0800 Subject: [PATCH 07/15] Add dask requirements for dask-related tests --- mars/contrib/dask/tests/test_dask.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/mars/contrib/dask/tests/test_dask.py b/mars/contrib/dask/tests/test_dask.py index 4224c84c34..803d1c4dc1 100644 --- a/mars/contrib/dask/tests/test_dask.py +++ b/mars/contrib/dask/tests/test_dask.py @@ -11,10 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pytest from mars.contrib.dask import convert_dask_collection, mars_scheduler +try: + import dask +except ImportError: + dask = None + + +def require_dask_installed(func): + return pytest.mark.skipif(dask is None, reason='dask not installed')(func) + +@require_dask_installed def test_delayed(): from dask import delayed import numpy as np @@ -39,6 +50,7 @@ def calc_pi(fs, N): assert dask_res == convert_dask_collection(pi).execute().fetch() +@require_dask_installed def test_partitioned_dataframe(): import numpy as np import pandas as pd @@ -58,6 +70,7 @@ def test_partitioned_dataframe(): assert_frame_equal(dask_res, convert_dask_collection(df).execute().fetch(), check_index_type=False) +@require_dask_installed def test_unpartitioned_dataframe(): from dask import dataframe as dd from pandas._testing import assert_frame_equal @@ -75,6 +88,7 @@ def test_unpartitioned_dataframe(): assert_frame_equal(dask_res, convert_dask_collection(df).execute().fetch()) +@require_dask_installed def test_array(): import dask.array as da from numpy.core.numeric import array_equal @@ -88,6 +102,7 @@ def test_array(): assert array_equal(dask_res, convert_dask_collection(z).execute().fetch()) +@require_dask_installed def test_bag(): import dask From 4dc2fb5b68e4322b201a0758b1b5b4c494d156fc Mon Sep 17 00:00:00 2001 From: loopyme Date: Mon, 9 Aug 2021 20:43:41 +0800 Subject: [PATCH 08/15] Add mar session setup & teardown function --- mars/contrib/dask/tests/test_dask.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/mars/contrib/dask/tests/test_dask.py b/mars/contrib/dask/tests/test_dask.py index 803d1c4dc1..cfb08def76 100644 --- a/mars/contrib/dask/tests/test_dask.py +++ b/mars/contrib/dask/tests/test_dask.py @@ -13,6 +13,7 @@ # limitations under the License. import pytest +from mars import new_session, stop_server from mars.contrib.dask import convert_dask_collection, mars_scheduler try: @@ -21,6 +22,14 @@ dask = None +def setup_function(): + new_session() + + +def teardown_function(): + stop_server() + + def require_dask_installed(func): return pytest.mark.skipif(dask is None, reason='dask not installed')(func) From c482709d3ff71de97725f7bbf37dce9d3c5c5113 Mon Sep 17 00:00:00 2001 From: loopyme Date: Tue, 10 Aug 2021 19:28:54 +0800 Subject: [PATCH 09/15] Add mimesis requirements for dask.bag tests --- mars/contrib/dask/tests/test_dask.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/mars/contrib/dask/tests/test_dask.py b/mars/contrib/dask/tests/test_dask.py index cfb08def76..1921754994 100644 --- a/mars/contrib/dask/tests/test_dask.py +++ b/mars/contrib/dask/tests/test_dask.py @@ -21,6 +21,11 @@ except ImportError: dask = None +try: + import mimesis +except ImportError: + mimesis = None + def setup_function(): new_session() @@ -34,6 +39,10 @@ def require_dask_installed(func): return pytest.mark.skipif(dask is None, reason='dask not installed')(func) +def require_mimesis_installed(func): + return pytest.mark.skipif(mimesis is None, reason='mimesis not installed')(func) + + @require_dask_installed def test_delayed(): from dask import delayed @@ -112,6 +121,7 @@ def test_array(): @require_dask_installed +@require_mimesis_installed def test_bag(): import dask From cf8e1a6783375bfe15923fd81f7238b95baa2e13 Mon Sep 17 00:00:00 2001 From: loopyme Date: Fri, 13 Aug 2021 21:52:11 +0800 Subject: [PATCH 10/15] Change ValueError -> TypeError --- mars/contrib/dask/converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mars/contrib/dask/converter.py b/mars/contrib/dask/converter.py index df73c82b18..d66a5f441c 100644 --- a/mars/contrib/dask/converter.py +++ b/mars/contrib/dask/converter.py @@ -33,7 +33,7 @@ def convert_dask_collection(dc): Mars Object. """ if not is_dask_collection(dc): - raise ValueError(f"'{type(dc).__name__}' object is not a valid dask collection") + raise TypeError(f"'{type(dc).__name__}' object is not a valid dask collection") dc.__dask_graph__().validate() dsk = optimize(dc)[0].__dask_graph__() From 2f433f9e14a568446ec3844f7f00695a456a5a1d Mon Sep 17 00:00:00 2001 From: loopyme Date: Fri, 13 Aug 2021 22:09:38 +0800 Subject: [PATCH 11/15] Add test for raising error --- mars/contrib/dask/tests/test_dask.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mars/contrib/dask/tests/test_dask.py b/mars/contrib/dask/tests/test_dask.py index 1921754994..0cbe452e2f 100644 --- a/mars/contrib/dask/tests/test_dask.py +++ b/mars/contrib/dask/tests/test_dask.py @@ -138,3 +138,9 @@ def test_bag(): assert dask_res == list( convert_dask_collection(result).execute().fetch() ) # TODO: dask-bag computation will return weird tuple, which we don't know why + + +@require_dask_installed +def test_dask_errors(): + with pytest.raises(TypeError): + convert_dask_collection({"foo": 0, "bar": 1}) From 908f1cef0700b231e1e92068d7e2ccaa14778ad5 Mon Sep 17 00:00:00 2001 From: loopyme Date: Fri, 13 Aug 2021 23:21:44 +0800 Subject: [PATCH 12/15] Use `lazy_import` way instead of redundant decorators --- mars/contrib/dask/tests/test_dask.py | 34 ++++++++-------------------- 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/mars/contrib/dask/tests/test_dask.py b/mars/contrib/dask/tests/test_dask.py index 0cbe452e2f..51aa391946 100644 --- a/mars/contrib/dask/tests/test_dask.py +++ b/mars/contrib/dask/tests/test_dask.py @@ -15,16 +15,10 @@ from mars import new_session, stop_server from mars.contrib.dask import convert_dask_collection, mars_scheduler +from mars.utils import lazy_import -try: - import dask -except ImportError: - dask = None - -try: - import mimesis -except ImportError: - mimesis = None +dask_installed = lazy_import('dask', globals=globals()) is not None +mimesis_installed = lazy_import('mimesis', globals=globals()) is not None def setup_function(): @@ -35,15 +29,7 @@ def teardown_function(): stop_server() -def require_dask_installed(func): - return pytest.mark.skipif(dask is None, reason='dask not installed')(func) - - -def require_mimesis_installed(func): - return pytest.mark.skipif(mimesis is None, reason='mimesis not installed')(func) - - -@require_dask_installed +@pytest.mark.skipif(not dask_installed, reason='dask not installed') def test_delayed(): from dask import delayed import numpy as np @@ -68,7 +54,7 @@ def calc_pi(fs, N): assert dask_res == convert_dask_collection(pi).execute().fetch() -@require_dask_installed +@pytest.mark.skipif(not dask_installed, reason='dask not installed') def test_partitioned_dataframe(): import numpy as np import pandas as pd @@ -88,7 +74,7 @@ def test_partitioned_dataframe(): assert_frame_equal(dask_res, convert_dask_collection(df).execute().fetch(), check_index_type=False) -@require_dask_installed +@pytest.mark.skipif(not dask_installed, reason='dask not installed') def test_unpartitioned_dataframe(): from dask import dataframe as dd from pandas._testing import assert_frame_equal @@ -106,7 +92,7 @@ def test_unpartitioned_dataframe(): assert_frame_equal(dask_res, convert_dask_collection(df).execute().fetch()) -@require_dask_installed +@pytest.mark.skipif(not dask_installed, reason='dask not installed') def test_array(): import dask.array as da from numpy.core.numeric import array_equal @@ -120,8 +106,8 @@ def test_array(): assert array_equal(dask_res, convert_dask_collection(z).execute().fetch()) -@require_dask_installed -@require_mimesis_installed +@pytest.mark.skipif(not dask_installed, reason='dask not installed') +@pytest.mark.skipif(not mimesis_installed, reason='mimesis not installed') def test_bag(): import dask @@ -140,7 +126,7 @@ def test_bag(): ) # TODO: dask-bag computation will return weird tuple, which we don't know why -@require_dask_installed +@pytest.mark.skipif(not dask_installed, reason='dask not installed') def test_dask_errors(): with pytest.raises(TypeError): convert_dask_collection({"foo": 0, "bar": 1}) From 717b213d260c046feb77f96c604acf9a42b22e6f Mon Sep 17 00:00:00 2001 From: loopyme Date: Sat, 14 Aug 2021 16:50:52 +0800 Subject: [PATCH 13/15] Use `setup_cluster` fixture instead of setup & teardown function --- mars/contrib/dask/tests/test_dask.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/mars/contrib/dask/tests/test_dask.py b/mars/contrib/dask/tests/test_dask.py index 51aa391946..db3dac07dd 100644 --- a/mars/contrib/dask/tests/test_dask.py +++ b/mars/contrib/dask/tests/test_dask.py @@ -13,7 +13,6 @@ # limitations under the License. import pytest -from mars import new_session, stop_server from mars.contrib.dask import convert_dask_collection, mars_scheduler from mars.utils import lazy_import @@ -21,16 +20,8 @@ mimesis_installed = lazy_import('mimesis', globals=globals()) is not None -def setup_function(): - new_session() - - -def teardown_function(): - stop_server() - - @pytest.mark.skipif(not dask_installed, reason='dask not installed') -def test_delayed(): +def test_delayed(setup_cluster): from dask import delayed import numpy as np @@ -55,7 +46,7 @@ def calc_pi(fs, N): @pytest.mark.skipif(not dask_installed, reason='dask not installed') -def test_partitioned_dataframe(): +def test_partitioned_dataframe(setup_cluster): import numpy as np import pandas as pd from dask import dataframe as dd @@ -75,7 +66,7 @@ def test_partitioned_dataframe(): @pytest.mark.skipif(not dask_installed, reason='dask not installed') -def test_unpartitioned_dataframe(): +def test_unpartitioned_dataframe(setup_cluster): from dask import dataframe as dd from pandas._testing import assert_frame_equal import pandas as pd @@ -93,7 +84,7 @@ def test_unpartitioned_dataframe(): @pytest.mark.skipif(not dask_installed, reason='dask not installed') -def test_array(): +def test_array(setup_cluster): import dask.array as da from numpy.core.numeric import array_equal @@ -108,7 +99,7 @@ def test_array(): @pytest.mark.skipif(not dask_installed, reason='dask not installed') @pytest.mark.skipif(not mimesis_installed, reason='mimesis not installed') -def test_bag(): +def test_bag(setup_cluster): import dask b = dask.datasets.make_people() # Make records of people From 6f4ef93f462eb17b82d77b708b46b86bc4238da9 Mon Sep 17 00:00:00 2001 From: loopyme Date: Mon, 16 Aug 2021 23:51:41 +0800 Subject: [PATCH 14/15] Add doc for Dask-on-Mars --- .../LC_MESSAGES/user_guide/contrib/dask.po | 84 +++++++++++++++++++ .../LC_MESSAGES/user_guide/contrib/index.po | 43 ++++++++++ docs/source/reference/contrib/reference.rst | 13 +++ docs/source/reference/index.rst | 1 + docs/source/user_guide/contrib/dask.rst | 64 ++++++++++++++ docs/source/user_guide/contrib/index.rst | 17 ++++ docs/source/user_guide/index.rst | 1 + mars/contrib/dask/scheduler.py | 6 +- 8 files changed, 227 insertions(+), 2 deletions(-) create mode 100644 docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po create mode 100644 docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/index.po create mode 100644 docs/source/reference/contrib/reference.rst create mode 100644 docs/source/user_guide/contrib/dask.rst create mode 100644 docs/source/user_guide/contrib/index.rst diff --git a/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po new file mode 100644 index 0000000000..2cbeb2b784 --- /dev/null +++ b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po @@ -0,0 +1,84 @@ +# SOME DESCRIPTIVE TITLE. +# Copyright (C) 1999-2020, The Alibaba Group Holding Ltd. +# This file is distributed under the same license as the mars package. +# FIRST AUTHOR , 2021. +# +#, fuzzy +msgid "" +msgstr "" +"Project-Id-Version: mars 0.8.0a1\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2021-08-16 21:57+0800\n" +"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" +"Last-Translator: FULL NAME \n" +"Language-Team: LANGUAGE \n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=utf-8\n" +"Content-Transfer-Encoding: 8bit\n" +"Generated-By: Babel 2.9.1\n" + +#: ../../source/user_guide/contrib/dask.rst:5 +msgid "Dask on Mars" +msgstr "Dask on Mars" + +#: ../../source/user_guide/contrib/dask.rst:7 +msgid "" +"Dask-on-Mars provides a simple way to excute the entire Dask ecosystem on" +" top of Mars." +msgstr "Dask-on-Mars 使得用户能通过简单的 API 调用,在 Mars 中运行大部分 Dask 任务" + +#: ../../source/user_guide/contrib/dask.rst:9 +msgid "" +"`Dask `__ is a flexible library for parallel computing" +" in Python, geared towards scaling analytics and scientific computing " +"workloads. It provides `big data collections " +"`__ and Dynamic " +"task scheduling optimized for computation." +msgstr "Dask `__ 是一个用于并行计算的 Python 库,旨在" +"为大规模数据的分析和科学计算提供并行的计算解决方案。" + +#: ../../source/user_guide/contrib/dask.rst:15 +msgid "" +"For execution on Mars, you should *not* use the `Dask.distributed " +"`__ client, " +"simply use plain Dask collections and functionalities." +msgstr "为了在 Mars 上运行 Dask 任务,用户不应该使用 `Dask.distributed " +"`__ 相关特性," +"只需使用普通的 Dask 特性和功能" + +#: ../../source/user_guide/contrib/dask.rst:20 +msgid "Scheduler" +msgstr "使用 Dask 调度器" + +#: ../../source/user_guide/contrib/dask.rst:22 +msgid "" +"The main API for Dask-on-Mars is " +":meth:`mars.contrib.dask.mars_scheduler`. It uses Dask’s scheduler API, " +"which allows you to specify any callable as the scheduler that you would " +"like Dask to use to execute your workload." +msgstr "Dask-on-Mars 的主要接口是 :meth:`mars.contrib.dask.mars_scheduler`," +"它兼容了 Dask 的 scheduler 接口,这使得用户可以直接指定使用 mars_scheduler 来调度执行 Dask " +"任务。" + +#: ../../source/user_guide/contrib/dask.rst:39 +msgid "Convert Dask Collections" +msgstr "将 Dask 任务转变为 Mars 任务" + +#: ../../source/user_guide/contrib/dask.rst:41 +msgid "" +":meth:`mars.contrib.dask.convert_dask_collection` can be used when user " +"needs to manipulate dask collections with :ref:`Mars remote API `" +" or other features. It converts dask collections like delayed or dask-" +"dataframe to Mars Objects, which can be considered as results returned by" +" :meth:`mars.remote.spawn`." +msgstr "当用户需要使用 :ref:`Mars remote API ` 或其他 Mars 特性来对任务进行更改时," +"可以使用 :meth:`mars.contrib.dask.convert_dask_collection` 来将 Dask 任务转变为 Mars 任务。" +"这一函数返回的 Mars 对象与 :meth:`mars.remote.spawn` 返回的对象类型一致。" + + +#: ../../source/user_guide/contrib/dask.rst:63 +msgid "" +"Dask-on-Mars is an ongoing project. Please open an issue if you find that" +" one of these dask functionalities doesn’t run on Mars." +msgstr " Dask-on-Mars 是一个实验性的项目。如果您发现运行存在问题,请在Issue中报告。" + diff --git a/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/index.po b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/index.po new file mode 100644 index 0000000000..9f25baa28d --- /dev/null +++ b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/index.po @@ -0,0 +1,43 @@ +# SOME DESCRIPTIVE TITLE. +# Copyright (C) 1999-2020, The Alibaba Group Holding Ltd. +# This file is distributed under the same license as the mars package. +# FIRST AUTHOR , 2021. +# +#, fuzzy +msgid "" +msgstr "" +"Project-Id-Version: mars 0.8.0a1\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2021-08-16 23:18+0800\n" +"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" +"Last-Translator: FULL NAME \n" +"Language-Team: LANGUAGE \n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=utf-8\n" +"Content-Transfer-Encoding: 8bit\n" +"Generated-By: Babel 2.9.1\n" + +#: ../../source/user_guide/contrib/index.rst:3 +msgid "Mars Contrib" +msgstr "Mars Contrib" + +#: ../../source/user_guide/contrib/index.rst:5 +msgid "Mars compatible module provides contributed functionalities." +msgstr "Mars Contrib 模块提供了与其他项目兼容相关的特性功能。" + +#: ../../source/user_guide/contrib/index.rst:8 +msgid "Dask on Mars" +msgstr "Dask on Mars" + +#: ../../source/user_guide/contrib/index.rst:10 +msgid "" +"Dask on Mars provides a simple way to execute the entire Dask ecosystem " +"on top of Mars." +msgstr "Dask-on-Mars 使得用户能通过简单的 API 调用,在 Mars 中运行大部分 Dask 任务" + +#: ../../source/user_guide/contrib/index.rst:12 +msgid "" +"Further information on any specific method can be obtained in the " +":ref:`API reference `." +msgstr "关于某个特定方法的更多信息,请参考 :ref:`API 参考手册 ` 。" + diff --git a/docs/source/reference/contrib/reference.rst b/docs/source/reference/contrib/reference.rst new file mode 100644 index 0000000000..726b302280 --- /dev/null +++ b/docs/source/reference/contrib/reference.rst @@ -0,0 +1,13 @@ +.. _dask_api: + +============ +Dask on Mars +============ + +.. currentmodule:: mars.contrib.dask + +.. autosummary:: + :toctree: generated/ + + scheduler.mars_scheduler + converter.convert_dask_collection diff --git a/docs/source/reference/index.rst b/docs/source/reference/index.rst index 412947c5b4..ddb4be4809 100644 --- a/docs/source/reference/index.rst +++ b/docs/source/reference/index.rst @@ -11,3 +11,4 @@ API reference dataframe/index learn/reference remote/reference + contrib/reference diff --git a/docs/source/user_guide/contrib/dask.rst b/docs/source/user_guide/contrib/dask.rst new file mode 100644 index 0000000000..a5d5798cc4 --- /dev/null +++ b/docs/source/user_guide/contrib/dask.rst @@ -0,0 +1,64 @@ +.. _dask: + +============ +Dask on Mars +============ + +Dask-on-Mars provides a simple way to excute the entire Dask ecosystem on top of Mars. + +`Dask `__ is a flexible library for parallel computing in Python, geared towards +scaling analytics and scientific computing workloads. It provides `big data collections +`__ and Dynamic task scheduling +optimized for computation. + +.. note:: + For execution on Mars, you should *not* use the + `Dask.distributed `__ + client, simply use plain Dask collections and functionalities. + +Scheduler +--------- + +The main API for Dask-on-Mars is :meth:`mars.contrib.dask.mars_scheduler`. It +uses Dask’s scheduler API, which allows you to specify any callable as the +scheduler that you would like Dask to use to execute your workload. + +.. code-block:: python + + >>> import dask + >>> from mars.contrib.dask import mars_scheduler + >>> + >>> def inc(x): + >>> return x + 1 + >>> + >>> dask_task = dask.delayed(inc)(1) + >>> dask_task.compute(scheduler=mars_scheduler) # Run delayed object on top of Mars + 2 + +Convert Dask Collections +------------------------ + +:meth:`mars.contrib.dask.convert_dask_collection` can be used when user needs to +manipulate dask collections with :ref:`Mars remote API ` or other +features. It converts dask collections like delayed or dask-dataframe to Mars Objects, +which can be considered as results returned by :meth:`mars.remote.spawn`. + +.. code-block:: python + + >>> import dask + >>> import mars.remote as mr + >>> from mars.contrib.dask import convert_dask_collection + >>> + >>> def inc(x): + >>> return x + 1 + >>> + >>> dask_task = dask.delayed(inc)(1) + >>> mars_obj = convert_dask_collection(dask_task) # Convert Dask object to Mars object + >>> mars_task = mr.spawn(inc, args=(mars_obj,)) + >>> mars_task + Object + >>> mars_task.execute().fetch() + 3 + +Dask-on-Mars is an ongoing project. Please open an issue if you find that one of +these dask functionalities doesn’t run on Mars. \ No newline at end of file diff --git a/docs/source/user_guide/contrib/index.rst b/docs/source/user_guide/contrib/index.rst new file mode 100644 index 0000000000..635f868e52 --- /dev/null +++ b/docs/source/user_guide/contrib/index.rst @@ -0,0 +1,17 @@ +============ +Mars Contrib +============ + +Mars compatible module provides contributed functionalities. + +Dask on Mars +------------ + +Dask on Mars provides a simple way to execute the entire Dask ecosystem on top of Mars. + +Further information on any specific method can be obtained in the :ref:`API reference `. + +.. toctree:: + :maxdepth: 1 + + dask diff --git a/docs/source/user_guide/index.rst b/docs/source/user_guide/index.rst index ddbdadcfe8..54edd83125 100644 --- a/docs/source/user_guide/index.rst +++ b/docs/source/user_guide/index.rst @@ -11,3 +11,4 @@ User Guide dataframe/index learn/index remote/index + contrib/index diff --git a/mars/contrib/dask/scheduler.py b/mars/contrib/dask/scheduler.py index 46b2417d09..113187c92e 100644 --- a/mars/contrib/dask/scheduler.py +++ b/mars/contrib/dask/scheduler.py @@ -22,8 +22,10 @@ def mars_scheduler(dsk: dict, keys: List[List[str]]): """ - A Dask-Mars scheduler. This scheduler is intended to be compatible - with existing dask user interface, no callbacks are implemented. + A Dask-Mars scheduler + + This scheduler is intended to be compatible with existing + dask user interface, no callbacks are implemented. Parameters ---------- From 66af6b9cb1321411e8ac09695f5baad6ee244c45 Mon Sep 17 00:00:00 2001 From: loopyme Date: Tue, 17 Aug 2021 07:43:53 +0800 Subject: [PATCH 15/15] Fix typo --- docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po | 2 +- docs/source/user_guide/contrib/dask.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po index 2cbeb2b784..ddf6d102f0 100644 --- a/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po +++ b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po @@ -23,7 +23,7 @@ msgstr "Dask on Mars" #: ../../source/user_guide/contrib/dask.rst:7 msgid "" -"Dask-on-Mars provides a simple way to excute the entire Dask ecosystem on" +"Dask-on-Mars provides a simple way to execute the entire Dask ecosystem on" " top of Mars." msgstr "Dask-on-Mars 使得用户能通过简单的 API 调用,在 Mars 中运行大部分 Dask 任务" diff --git a/docs/source/user_guide/contrib/dask.rst b/docs/source/user_guide/contrib/dask.rst index a5d5798cc4..e97aac149f 100644 --- a/docs/source/user_guide/contrib/dask.rst +++ b/docs/source/user_guide/contrib/dask.rst @@ -4,7 +4,7 @@ Dask on Mars ============ -Dask-on-Mars provides a simple way to excute the entire Dask ecosystem on top of Mars. +Dask-on-Mars provides a simple way to execute the entire Dask ecosystem on top of Mars. `Dask `__ is a flexible library for parallel computing in Python, geared towards scaling analytics and scientific computing workloads. It provides `big data collections