diff --git a/.github/workflows/platform-ci.yml b/.github/workflows/platform-ci.yml index c968f18668..a5c06f1863 100644 --- a/.github/workflows/platform-ci.yml +++ b/.github/workflows/platform-ci.yml @@ -9,7 +9,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - python-version: [3.8-kubernetes, 3.8-hadoop, 3.8-ray, 3.8-vineyard] + python-version: [3.8-kubernetes, 3.8-hadoop, 3.8-ray, 3.8-vineyard, 3.8-dask] include: - { os: ubuntu-latest, python-version: 3.8-kubernetes, no-common-tests: 1, no-deploy: 1, with-kubernetes: "with Kubernetes" } @@ -19,6 +19,8 @@ jobs: no-deploy: 1, with-vineyard: "with vineyard" } - { os: ubuntu-latest, python-version: 3.8-ray, no-common-tests: 1, no-deploy: 1, with-ray: "with ray" } + - { os: ubuntu-latest, python-version: 3.8-dask, no-common-tests: 1, + no-deploy: 1, run-dask: "run dask" } steps: - name: Check out code @@ -40,6 +42,7 @@ jobs: WITH_KUBERNETES: ${{ matrix.with-kubernetes }} WITH_VINEYARD: ${{ matrix.with-vineyard }} WITH_RAY: ${{ matrix.with-ray }} + RUN_DASK: ${{ matrix.run-dask }} NO_COMMON_TESTS: ${{ matrix.no-common-tests }} shell: bash run: | @@ -88,6 +91,9 @@ jobs: # remove ray version after https://github.com/ray-project/ray/issues/16802 got fixed pip install ray[default]==1.3.0 fi + if [ -n "$RUN_DASK" ]; then + pip install dask[complete] mimesis sklearn + fi fi conda list -n test @@ -98,6 +104,7 @@ jobs: WITH_CYTHON: ${{ matrix.with-cython }} WITH_VINEYARD: ${{ matrix.with-vineyard }} WITH_RAY: ${{ matrix.with-ray }} + RUN_DASK: ${{ matrix.run-dask }} NO_COMMON_TESTS: ${{ matrix.no-common-tests }} NUMPY_EXPERIMENTAL_ARRAY_FUNCTION: 1 CHANGE_MINIKUBE_NONE_USER: true @@ -127,6 +134,10 @@ jobs: pytest $PYTEST_CONFIG --timeout=120 -m ray coverage report fi + if [ -n "$RUN_DASK" ]; then + pytest $PYTEST_CONFIG mars/contrib/dask/tests/test_dask.py + coverage report + fi coverage xml - name: Stop vineyard runtime diff --git a/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po new file mode 100644 index 0000000000..ddf6d102f0 --- /dev/null +++ b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po @@ -0,0 +1,84 @@ +# SOME DESCRIPTIVE TITLE. +# Copyright (C) 1999-2020, The Alibaba Group Holding Ltd. +# This file is distributed under the same license as the mars package. +# FIRST AUTHOR , 2021. +# +#, fuzzy +msgid "" +msgstr "" +"Project-Id-Version: mars 0.8.0a1\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2021-08-16 21:57+0800\n" +"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" +"Last-Translator: FULL NAME \n" +"Language-Team: LANGUAGE \n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=utf-8\n" +"Content-Transfer-Encoding: 8bit\n" +"Generated-By: Babel 2.9.1\n" + +#: ../../source/user_guide/contrib/dask.rst:5 +msgid "Dask on Mars" +msgstr "Dask on Mars" + +#: ../../source/user_guide/contrib/dask.rst:7 +msgid "" +"Dask-on-Mars provides a simple way to execute the entire Dask ecosystem on" +" top of Mars." +msgstr "Dask-on-Mars 使得用户能通过简单的 API 调用,在 Mars 中运行大部分 Dask 任务" + +#: ../../source/user_guide/contrib/dask.rst:9 +msgid "" +"`Dask `__ is a flexible library for parallel computing" +" in Python, geared towards scaling analytics and scientific computing " +"workloads. It provides `big data collections " +"`__ and Dynamic " +"task scheduling optimized for computation." +msgstr "Dask `__ 是一个用于并行计算的 Python 库,旨在" +"为大规模数据的分析和科学计算提供并行的计算解决方案。" + +#: ../../source/user_guide/contrib/dask.rst:15 +msgid "" +"For execution on Mars, you should *not* use the `Dask.distributed " +"`__ client, " +"simply use plain Dask collections and functionalities." +msgstr "为了在 Mars 上运行 Dask 任务,用户不应该使用 `Dask.distributed " +"`__ 相关特性," +"只需使用普通的 Dask 特性和功能" + +#: ../../source/user_guide/contrib/dask.rst:20 +msgid "Scheduler" +msgstr "使用 Dask 调度器" + +#: ../../source/user_guide/contrib/dask.rst:22 +msgid "" +"The main API for Dask-on-Mars is " +":meth:`mars.contrib.dask.mars_scheduler`. It uses Dask’s scheduler API, " +"which allows you to specify any callable as the scheduler that you would " +"like Dask to use to execute your workload." +msgstr "Dask-on-Mars 的主要接口是 :meth:`mars.contrib.dask.mars_scheduler`," +"它兼容了 Dask 的 scheduler 接口,这使得用户可以直接指定使用 mars_scheduler 来调度执行 Dask " +"任务。" + +#: ../../source/user_guide/contrib/dask.rst:39 +msgid "Convert Dask Collections" +msgstr "将 Dask 任务转变为 Mars 任务" + +#: ../../source/user_guide/contrib/dask.rst:41 +msgid "" +":meth:`mars.contrib.dask.convert_dask_collection` can be used when user " +"needs to manipulate dask collections with :ref:`Mars remote API `" +" or other features. It converts dask collections like delayed or dask-" +"dataframe to Mars Objects, which can be considered as results returned by" +" :meth:`mars.remote.spawn`." +msgstr "当用户需要使用 :ref:`Mars remote API ` 或其他 Mars 特性来对任务进行更改时," +"可以使用 :meth:`mars.contrib.dask.convert_dask_collection` 来将 Dask 任务转变为 Mars 任务。" +"这一函数返回的 Mars 对象与 :meth:`mars.remote.spawn` 返回的对象类型一致。" + + +#: ../../source/user_guide/contrib/dask.rst:63 +msgid "" +"Dask-on-Mars is an ongoing project. Please open an issue if you find that" +" one of these dask functionalities doesn’t run on Mars." +msgstr " Dask-on-Mars 是一个实验性的项目。如果您发现运行存在问题,请在Issue中报告。" + diff --git a/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/index.po b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/index.po new file mode 100644 index 0000000000..9f25baa28d --- /dev/null +++ b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/index.po @@ -0,0 +1,43 @@ +# SOME DESCRIPTIVE TITLE. +# Copyright (C) 1999-2020, The Alibaba Group Holding Ltd. +# This file is distributed under the same license as the mars package. +# FIRST AUTHOR , 2021. +# +#, fuzzy +msgid "" +msgstr "" +"Project-Id-Version: mars 0.8.0a1\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2021-08-16 23:18+0800\n" +"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" +"Last-Translator: FULL NAME \n" +"Language-Team: LANGUAGE \n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=utf-8\n" +"Content-Transfer-Encoding: 8bit\n" +"Generated-By: Babel 2.9.1\n" + +#: ../../source/user_guide/contrib/index.rst:3 +msgid "Mars Contrib" +msgstr "Mars Contrib" + +#: ../../source/user_guide/contrib/index.rst:5 +msgid "Mars compatible module provides contributed functionalities." +msgstr "Mars Contrib 模块提供了与其他项目兼容相关的特性功能。" + +#: ../../source/user_guide/contrib/index.rst:8 +msgid "Dask on Mars" +msgstr "Dask on Mars" + +#: ../../source/user_guide/contrib/index.rst:10 +msgid "" +"Dask on Mars provides a simple way to execute the entire Dask ecosystem " +"on top of Mars." +msgstr "Dask-on-Mars 使得用户能通过简单的 API 调用,在 Mars 中运行大部分 Dask 任务" + +#: ../../source/user_guide/contrib/index.rst:12 +msgid "" +"Further information on any specific method can be obtained in the " +":ref:`API reference `." +msgstr "关于某个特定方法的更多信息,请参考 :ref:`API 参考手册 ` 。" + diff --git a/docs/source/reference/contrib/reference.rst b/docs/source/reference/contrib/reference.rst new file mode 100644 index 0000000000..726b302280 --- /dev/null +++ b/docs/source/reference/contrib/reference.rst @@ -0,0 +1,13 @@ +.. _dask_api: + +============ +Dask on Mars +============ + +.. currentmodule:: mars.contrib.dask + +.. autosummary:: + :toctree: generated/ + + scheduler.mars_scheduler + converter.convert_dask_collection diff --git a/docs/source/reference/index.rst b/docs/source/reference/index.rst index 412947c5b4..ddb4be4809 100644 --- a/docs/source/reference/index.rst +++ b/docs/source/reference/index.rst @@ -11,3 +11,4 @@ API reference dataframe/index learn/reference remote/reference + contrib/reference diff --git a/docs/source/user_guide/contrib/dask.rst b/docs/source/user_guide/contrib/dask.rst new file mode 100644 index 0000000000..e97aac149f --- /dev/null +++ b/docs/source/user_guide/contrib/dask.rst @@ -0,0 +1,64 @@ +.. _dask: + +============ +Dask on Mars +============ + +Dask-on-Mars provides a simple way to execute the entire Dask ecosystem on top of Mars. + +`Dask `__ is a flexible library for parallel computing in Python, geared towards +scaling analytics and scientific computing workloads. It provides `big data collections +`__ and Dynamic task scheduling +optimized for computation. + +.. note:: + For execution on Mars, you should *not* use the + `Dask.distributed `__ + client, simply use plain Dask collections and functionalities. + +Scheduler +--------- + +The main API for Dask-on-Mars is :meth:`mars.contrib.dask.mars_scheduler`. It +uses Dask’s scheduler API, which allows you to specify any callable as the +scheduler that you would like Dask to use to execute your workload. + +.. code-block:: python + + >>> import dask + >>> from mars.contrib.dask import mars_scheduler + >>> + >>> def inc(x): + >>> return x + 1 + >>> + >>> dask_task = dask.delayed(inc)(1) + >>> dask_task.compute(scheduler=mars_scheduler) # Run delayed object on top of Mars + 2 + +Convert Dask Collections +------------------------ + +:meth:`mars.contrib.dask.convert_dask_collection` can be used when user needs to +manipulate dask collections with :ref:`Mars remote API ` or other +features. It converts dask collections like delayed or dask-dataframe to Mars Objects, +which can be considered as results returned by :meth:`mars.remote.spawn`. + +.. code-block:: python + + >>> import dask + >>> import mars.remote as mr + >>> from mars.contrib.dask import convert_dask_collection + >>> + >>> def inc(x): + >>> return x + 1 + >>> + >>> dask_task = dask.delayed(inc)(1) + >>> mars_obj = convert_dask_collection(dask_task) # Convert Dask object to Mars object + >>> mars_task = mr.spawn(inc, args=(mars_obj,)) + >>> mars_task + Object + >>> mars_task.execute().fetch() + 3 + +Dask-on-Mars is an ongoing project. Please open an issue if you find that one of +these dask functionalities doesn’t run on Mars. \ No newline at end of file diff --git a/docs/source/user_guide/contrib/index.rst b/docs/source/user_guide/contrib/index.rst new file mode 100644 index 0000000000..635f868e52 --- /dev/null +++ b/docs/source/user_guide/contrib/index.rst @@ -0,0 +1,17 @@ +============ +Mars Contrib +============ + +Mars compatible module provides contributed functionalities. + +Dask on Mars +------------ + +Dask on Mars provides a simple way to execute the entire Dask ecosystem on top of Mars. + +Further information on any specific method can be obtained in the :ref:`API reference `. + +.. toctree:: + :maxdepth: 1 + + dask diff --git a/docs/source/user_guide/index.rst b/docs/source/user_guide/index.rst index ddbdadcfe8..54edd83125 100644 --- a/docs/source/user_guide/index.rst +++ b/docs/source/user_guide/index.rst @@ -11,3 +11,4 @@ User Guide dataframe/index learn/index remote/index + contrib/index diff --git a/mars/contrib/__init__.py b/mars/contrib/__init__.py new file mode 100644 index 0000000000..c71e83c08e --- /dev/null +++ b/mars/contrib/__init__.py @@ -0,0 +1,13 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/mars/contrib/dask/__init__.py b/mars/contrib/dask/__init__.py new file mode 100644 index 0000000000..c313dd9d87 --- /dev/null +++ b/mars/contrib/dask/__init__.py @@ -0,0 +1,28 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# noinspection PyUnresolvedReferences + + +def raise_dask_required_error(): + raise ImportError('you need to have dask installed for this to work') + + +try: + import dask +except ImportError: + convert_dask_collection = mars_scheduler = raise_dask_required_error +else: + from .converter import convert_dask_collection + from .scheduler import mars_scheduler diff --git a/mars/contrib/dask/converter.py b/mars/contrib/dask/converter.py new file mode 100644 index 0000000000..d66a5f441c --- /dev/null +++ b/mars/contrib/dask/converter.py @@ -0,0 +1,50 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dask import is_dask_collection, optimize + +from .scheduler import mars_dask_get +from .utils import reduce + + +def convert_dask_collection(dc): + """ + Convert dask collection object into mars.core.Object via remote API + + Parameters + ---------- + dc: dask collection + Dask collection object to be converted. + + Returns + ------- + Object + Mars Object. + """ + if not is_dask_collection(dc): + raise TypeError(f"'{type(dc).__name__}' object is not a valid dask collection") + + dc.__dask_graph__().validate() + dsk = optimize(dc)[0].__dask_graph__() + + first_key = next(iter(dsk.keys())) + if isinstance(first_key, str): + key = [first_key] + elif isinstance(first_key, tuple): + key = sorted([i for i in dsk.keys() if i[0] == first_key[0]], key=lambda x: x[1]) + else: + raise ValueError( + f"Dask collection object seems be broken, with unexpected key type:'{type(first_key).__name__}'") + + return reduce(mars_dask_get(dsk, [key])) diff --git a/mars/contrib/dask/scheduler.py b/mars/contrib/dask/scheduler.py new file mode 100644 index 0000000000..113187c92e --- /dev/null +++ b/mars/contrib/dask/scheduler.py @@ -0,0 +1,84 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Tuple + +from dask.core import istask, ishashable + +from .utils import reduce +from ...remote import spawn + + +def mars_scheduler(dsk: dict, keys: List[List[str]]): + """ + A Dask-Mars scheduler + + This scheduler is intended to be compatible with existing + dask user interface, no callbacks are implemented. + + Parameters + ---------- + dsk: Dict + Dask graph, represented as a task DAG dictionary. + keys: List[List[str]] + 2d-list of Dask graph keys whose values we wish to compute and return. + + Returns + ------- + Object + Computed values corresponding to the provided keys. + """ + + return [[reduce(mars_dask_get(dsk, keys)).execute().fetch()]] + + +def mars_dask_get(dsk: dict, keys: List[List]): + """ + A Dask-Mars convert function. This function will send the dask graph layers + to Mars Remote API, generating mars objects correspond to the provided keys. + + Parameters + ---------- + dsk: Dict + Dask graph, represented as a task DAG dictionary. + keys: List[List[str]] + 2d-list of Dask graph keys whose values we wish to compute and return. + + Returns + ------- + Object + Spawned mars objects corresponding to the provided keys. + """ + + def _get_arg(a): + # if arg contains layer index or callable objs, handle it + if ishashable(a) and a in dsk.keys(): + while ishashable(a) and a in dsk.keys(): + a = dsk[a] + return _execute_task(a) + elif not isinstance(a, str) and hasattr(a, "__getitem__"): + if istask(a): # TODO:Handle `SubgraphCallable`, which may contains dsk in it + return spawn(a[0], args=tuple(_get_arg(i) for i in a[1:])) + elif isinstance(a, dict): + return {k: _get_arg(v) for k, v in a.items()} + elif isinstance(a, List) or isinstance(a, Tuple): + return type(a)(_get_arg(i) for i in a) + return a + + def _execute_task(task: tuple): + if not istask(task): + return _get_arg(task) + return spawn(task[0], args=tuple(_get_arg(a) for a in task[1:])) + + return [[_execute_task(dsk[k]) for k in keys_d] for keys_d in keys] diff --git a/mars/contrib/dask/tests/__init__.py b/mars/contrib/dask/tests/__init__.py new file mode 100644 index 0000000000..7f34198853 --- /dev/null +++ b/mars/contrib/dask/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/mars/contrib/dask/tests/test_dask.py b/mars/contrib/dask/tests/test_dask.py new file mode 100644 index 0000000000..db3dac07dd --- /dev/null +++ b/mars/contrib/dask/tests/test_dask.py @@ -0,0 +1,123 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from mars.contrib.dask import convert_dask_collection, mars_scheduler +from mars.utils import lazy_import + +dask_installed = lazy_import('dask', globals=globals()) is not None +mimesis_installed = lazy_import('mimesis', globals=globals()) is not None + + +@pytest.mark.skipif(not dask_installed, reason='dask not installed') +def test_delayed(setup_cluster): + from dask import delayed + import numpy as np + + def calc_chunk(n: int, i: int): + rs = np.random.RandomState(i) + a = rs.uniform(-1, 1, size=(n, 2)) + d = np.linalg.norm(a, axis=1) + return (d < 1).sum() + + def calc_pi(fs, N): + return sum(fs) * 4 / N + + N = 200_000_000 + n = 10_000_000 + + fs = [delayed(calc_chunk)(n, i) for i in range(N // n)] + pi = delayed(calc_pi)(fs, N) + + dask_res = pi.compute() + assert dask_res == pi.compute(scheduler=mars_scheduler) + assert dask_res == convert_dask_collection(pi).execute().fetch() + + +@pytest.mark.skipif(not dask_installed, reason='dask not installed') +def test_partitioned_dataframe(setup_cluster): + import numpy as np + import pandas as pd + from dask import dataframe as dd + from pandas._testing import assert_frame_equal + + data = np.random.randn(10000, 100) + df = dd.from_pandas( + pd.DataFrame(data, columns=[f"col{i}" for i in range(100)]), npartitions=4 + ) + df["col0"] = df["col0"] + df["col1"] / 2 + col2_mean = df["col2"].mean() + df = df[df["col2"] > col2_mean] + + dask_res = df.compute() + assert_frame_equal(dask_res, df.compute(scheduler=mars_scheduler), check_index_type=False) + assert_frame_equal(dask_res, convert_dask_collection(df).execute().fetch(), check_index_type=False) + + +@pytest.mark.skipif(not dask_installed, reason='dask not installed') +def test_unpartitioned_dataframe(setup_cluster): + from dask import dataframe as dd + from pandas._testing import assert_frame_equal + import pandas as pd + from sklearn.datasets import load_boston + + boston = load_boston() + pd.DataFrame(boston.data, columns=boston['feature_names']).to_csv("./boston_housing_data.csv") + + df = dd.read_csv(r"./boston_housing_data.csv") + df["CRIM"] = df["CRIM"] / 2 + + dask_res = df.compute() + assert_frame_equal(dask_res, df.compute(scheduler=mars_scheduler)) + assert_frame_equal(dask_res, convert_dask_collection(df).execute().fetch()) + + +@pytest.mark.skipif(not dask_installed, reason='dask not installed') +def test_array(setup_cluster): + import dask.array as da + from numpy.core.numeric import array_equal + + x = da.random.random((10000, 10000), chunks=(1000, 1000)) + y = x + x.T + z = y[::2, 5000:].mean(axis=1) + + dask_res = z.compute() + assert array_equal(dask_res, z.compute(scheduler=mars_scheduler)) + assert array_equal(dask_res, convert_dask_collection(z).execute().fetch()) + + +@pytest.mark.skipif(not dask_installed, reason='dask not installed') +@pytest.mark.skipif(not mimesis_installed, reason='mimesis not installed') +def test_bag(setup_cluster): + import dask + + b = dask.datasets.make_people() # Make records of people + result = ( + b.filter(lambda record: record["age"] > 30) + .map(lambda record: record["occupation"]) + .frequencies(sort=True) + .topk(10, key=1) + ) + + dask_res = result.compute() + assert dask_res == result.compute(scheduler=mars_scheduler) + assert dask_res == list( + convert_dask_collection(result).execute().fetch() + ) # TODO: dask-bag computation will return weird tuple, which we don't know why + + +@pytest.mark.skipif(not dask_installed, reason='dask not installed') +def test_dask_errors(): + with pytest.raises(TypeError): + convert_dask_collection({"foo": 0, "bar": 1}) diff --git a/mars/contrib/dask/utils.py b/mars/contrib/dask/utils.py new file mode 100644 index 0000000000..66ab4a2312 --- /dev/null +++ b/mars/contrib/dask/utils.py @@ -0,0 +1,69 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List + +from dask import is_dask_collection +from dask.array.core import _concatenate2 as array_concat +from dask.dataframe import concat as df_concat +from dask.utils import is_arraylike, is_dataframe_like, is_series_like, is_index_like + +from ...remote import spawn + + +def concat(objs: List): + """ + Concat the results of partitioned dask task executions. This function guess the + types of resulting list, then calls the corresponding native dask concat functions. + + Parameters + ---------- + objs: List + List of the partitioned dask task execution results, which will be concat. + + Returns + ------- + obj: + The concat result + + """ + if is_arraylike(objs[0]): + res = array_concat(objs, axes=[0]) # TODO: Add concat with args support + elif any((is_dataframe_like(objs[0]), is_series_like(objs[0]), is_index_like(objs[0]))): + res = df_concat(objs) + else: + res = objs + while isinstance(res, List): + res = res[0] + return res.compute() if is_dask_collection(res) else res + + +def reduce(objs: List[List]): + """ + Spawn a concat task for 2d-list objects + + Parameters + ---------- + objs: List + 2d-list of the partitioned dask task execution results, which will be concat. + + Returns + ------- + obj: + The spawning concat task. + """ + return spawn( + concat, + args=([spawn(concat, args=(objs_d,)) for objs_d in objs],), + )