Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental Dask-on-Mars support #2289

Merged
merged 15 commits into from
Aug 17, 2021
Merged
13 changes: 12 additions & 1 deletion .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest]
python-version: [3.8-kubernetes, 3.8-hadoop, 3.8-ray, 3.8-vineyard]
python-version: [3.8-kubernetes, 3.8-hadoop, 3.8-ray, 3.8-vineyard, 3.8-dask]
include:
- { os: ubuntu-latest, python-version: 3.8-kubernetes, no-common-tests: 1,
no-deploy: 1, with-kubernetes: "with Kubernetes" }
Expand All @@ -19,6 +19,8 @@ jobs:
no-deploy: 1, with-vineyard: "with vineyard" }
- { os: ubuntu-latest, python-version: 3.8-ray, no-common-tests: 1,
no-deploy: 1, with-ray: "with ray" }
- { os: ubuntu-latest, python-version: 3.8-dask, no-common-tests: 1,
no-deploy: 1, run-dask: "run dask" }

steps:
- name: Check out code
Expand All @@ -40,6 +42,7 @@ jobs:
WITH_KUBERNETES: ${{ matrix.with-kubernetes }}
WITH_VINEYARD: ${{ matrix.with-vineyard }}
WITH_RAY: ${{ matrix.with-ray }}
RUN_DASK: ${{ matrix.run-dask }}
NO_COMMON_TESTS: ${{ matrix.no-common-tests }}
shell: bash
run: |
Expand Down Expand Up @@ -88,6 +91,9 @@ jobs:
# remove ray version after https://github.com/ray-project/ray/issues/16802 got fixed
pip install ray[default]==1.3.0
fi
if [ -n "$RUN_DASK" ]; then
pip install dask[complete] mimesis sklearn
fi
fi
conda list -n test

Expand All @@ -98,6 +104,7 @@ jobs:
WITH_CYTHON: ${{ matrix.with-cython }}
WITH_VINEYARD: ${{ matrix.with-vineyard }}
WITH_RAY: ${{ matrix.with-ray }}
RUN_DASK: ${{ matrix.run-dask }}
NO_COMMON_TESTS: ${{ matrix.no-common-tests }}
NUMPY_EXPERIMENTAL_ARRAY_FUNCTION: 1
CHANGE_MINIKUBE_NONE_USER: true
Expand Down Expand Up @@ -127,6 +134,10 @@ jobs:
pytest $PYTEST_CONFIG --timeout=120 -m ray
coverage report
fi
if [ -n "$RUN_DASK" ]; then
pytest $PYTEST_CONFIG mars/contrib/dask/tests/test_dask.py
coverage report
fi
coverage xml

- name: Stop vineyard runtime
Expand Down
84 changes: 84 additions & 0 deletions docs/source/locale/zh_CN/LC_MESSAGES/user_guide/contrib/dask.po
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) 1999-2020, The Alibaba Group Holding Ltd.
# This file is distributed under the same license as the mars package.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2021.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: mars 0.8.0a1\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2021-08-16 21:57+0800\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.9.1\n"

#: ../../source/user_guide/contrib/dask.rst:5
msgid "Dask on Mars"
msgstr "Dask on Mars"

#: ../../source/user_guide/contrib/dask.rst:7
msgid ""
"Dask-on-Mars provides a simple way to execute the entire Dask ecosystem on"
" top of Mars."
msgstr "Dask-on-Mars 使得用户能通过简单的 API 调用,在 Mars 中运行大部分 Dask 任务"

#: ../../source/user_guide/contrib/dask.rst:9
msgid ""
"`Dask <https://dask.org/>`__ is a flexible library for parallel computing"
" in Python, geared towards scaling analytics and scientific computing "
"workloads. It provides `big data collections "
"<https://docs.dask.org/en/latest/user-interfaces.html>`__ and Dynamic "
"task scheduling optimized for computation."
msgstr "Dask <https://dask.org/>`__ 是一个用于并行计算的 Python 库,旨在"
"为大规模数据的分析和科学计算提供并行的计算解决方案。"

#: ../../source/user_guide/contrib/dask.rst:15
msgid ""
"For execution on Mars, you should *not* use the `Dask.distributed "
"<https://distributed.dask.org/en/latest/quickstart.html>`__ client, "
"simply use plain Dask collections and functionalities."
msgstr "为了在 Mars 上运行 Dask 任务,用户不应该使用 `Dask.distributed "
"<https://distributed.dask.org/en/latest/quickstart.html>`__ 相关特性,"
"只需使用普通的 Dask 特性和功能"

#: ../../source/user_guide/contrib/dask.rst:20
msgid "Scheduler"
msgstr "使用 Dask 调度器"

#: ../../source/user_guide/contrib/dask.rst:22
msgid ""
"The main API for Dask-on-Mars is "
":meth:`mars.contrib.dask.mars_scheduler`. It uses Dask’s scheduler API, "
"which allows you to specify any callable as the scheduler that you would "
"like Dask to use to execute your workload."
msgstr "Dask-on-Mars 的主要接口是 :meth:`mars.contrib.dask.mars_scheduler`,"
"它兼容了 Dask 的 scheduler 接口,这使得用户可以直接指定使用 mars_scheduler 来调度执行 Dask "
"任务。"

#: ../../source/user_guide/contrib/dask.rst:39
msgid "Convert Dask Collections"
msgstr "将 Dask 任务转变为 Mars 任务"

#: ../../source/user_guide/contrib/dask.rst:41
msgid ""
":meth:`mars.contrib.dask.convert_dask_collection` can be used when user "
"needs to manipulate dask collections with :ref:`Mars remote API <remote>`"
" or other features. It converts dask collections like delayed or dask-"
"dataframe to Mars Objects, which can be considered as results returned by"
" :meth:`mars.remote.spawn`."
msgstr "当用户需要使用 :ref:`Mars remote API <remote>` 或其他 Mars 特性来对任务进行更改时,"
"可以使用 :meth:`mars.contrib.dask.convert_dask_collection` 来将 Dask 任务转变为 Mars 任务。"
"这一函数返回的 Mars 对象与 :meth:`mars.remote.spawn` 返回的对象类型一致。"


#: ../../source/user_guide/contrib/dask.rst:63
msgid ""
"Dask-on-Mars is an ongoing project. Please open an issue if you find that"
" one of these dask functionalities doesn’t run on Mars."
msgstr " Dask-on-Mars 是一个实验性的项目。如果您发现运行存在问题,请在Issue中报告。"

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) 1999-2020, The Alibaba Group Holding Ltd.
# This file is distributed under the same license as the mars package.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2021.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: mars 0.8.0a1\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2021-08-16 23:18+0800\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.9.1\n"

#: ../../source/user_guide/contrib/index.rst:3
msgid "Mars Contrib"
msgstr "Mars Contrib"

#: ../../source/user_guide/contrib/index.rst:5
msgid "Mars compatible module provides contributed functionalities."
msgstr "Mars Contrib 模块提供了与其他项目兼容相关的特性功能。"

#: ../../source/user_guide/contrib/index.rst:8
msgid "Dask on Mars"
msgstr "Dask on Mars"

#: ../../source/user_guide/contrib/index.rst:10
msgid ""
"Dask on Mars provides a simple way to execute the entire Dask ecosystem "
"on top of Mars."
msgstr "Dask-on-Mars 使得用户能通过简单的 API 调用,在 Mars 中运行大部分 Dask 任务"

#: ../../source/user_guide/contrib/index.rst:12
msgid ""
"Further information on any specific method can be obtained in the "
":ref:`API reference <dask_api>`."
msgstr "关于某个特定方法的更多信息,请参考 :ref:`API 参考手册 <dask_api>` 。"

13 changes: 13 additions & 0 deletions docs/source/reference/contrib/reference.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.. _dask_api:

============
Dask on Mars
============

.. currentmodule:: mars.contrib.dask

.. autosummary::
:toctree: generated/

scheduler.mars_scheduler
converter.convert_dask_collection
1 change: 1 addition & 0 deletions docs/source/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ API reference
dataframe/index
learn/reference
remote/reference
contrib/reference
64 changes: 64 additions & 0 deletions docs/source/user_guide/contrib/dask.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
.. _dask:

============
Dask on Mars
============

Dask-on-Mars provides a simple way to execute the entire Dask ecosystem on top of Mars.

`Dask <https://dask.org/>`__ is a flexible library for parallel computing in Python, geared towards
scaling analytics and scientific computing workloads. It provides `big data collections
<https://docs.dask.org/en/latest/user-interfaces.html>`__ and Dynamic task scheduling
optimized for computation.

.. note::
For execution on Mars, you should *not* use the
`Dask.distributed <https://distributed.dask.org/en/latest/quickstart.html>`__
client, simply use plain Dask collections and functionalities.

Scheduler
---------

The main API for Dask-on-Mars is :meth:`mars.contrib.dask.mars_scheduler`. It
uses Dask’s scheduler API, which allows you to specify any callable as the
scheduler that you would like Dask to use to execute your workload.

.. code-block:: python

>>> import dask
>>> from mars.contrib.dask import mars_scheduler
>>>
>>> def inc(x):
>>> return x + 1
>>>
>>> dask_task = dask.delayed(inc)(1)
>>> dask_task.compute(scheduler=mars_scheduler) # Run delayed object on top of Mars
2

Convert Dask Collections
------------------------

:meth:`mars.contrib.dask.convert_dask_collection` can be used when user needs to
manipulate dask collections with :ref:`Mars remote API <remote>` or other
features. It converts dask collections like delayed or dask-dataframe to Mars Objects,
which can be considered as results returned by :meth:`mars.remote.spawn`.

.. code-block:: python

>>> import dask
>>> import mars.remote as mr
>>> from mars.contrib.dask import convert_dask_collection
>>>
>>> def inc(x):
>>> return x + 1
>>>
>>> dask_task = dask.delayed(inc)(1)
>>> mars_obj = convert_dask_collection(dask_task) # Convert Dask object to Mars object
>>> mars_task = mr.spawn(inc, args=(mars_obj,))
>>> mars_task
Object <op=RemoteFunction, key=14a77b28d32904002829b2e8c6474b56>
>>> mars_task.execute().fetch()
3

Dask-on-Mars is an ongoing project. Please open an issue if you find that one of
these dask functionalities doesn’t run on Mars.
17 changes: 17 additions & 0 deletions docs/source/user_guide/contrib/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
============
Mars Contrib
============

Mars compatible module provides contributed functionalities.

Dask on Mars
------------

Dask on Mars provides a simple way to execute the entire Dask ecosystem on top of Mars.

Further information on any specific method can be obtained in the :ref:`API reference <dask_api>`.

.. toctree::
:maxdepth: 1

dask
1 change: 1 addition & 0 deletions docs/source/user_guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ User Guide
dataframe/index
learn/index
remote/index
contrib/index
13 changes: 13 additions & 0 deletions mars/contrib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
28 changes: 28 additions & 0 deletions mars/contrib/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# noinspection PyUnresolvedReferences


def raise_dask_required_error():
raise ImportError('you need to have dask installed for this to work')


try:
import dask
except ImportError:
convert_dask_collection = mars_scheduler = raise_dask_required_error
else:
from .converter import convert_dask_collection
from .scheduler import mars_scheduler
50 changes: 50 additions & 0 deletions mars/contrib/dask/converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dask import is_dask_collection, optimize

from .scheduler import mars_dask_get
from .utils import reduce


def convert_dask_collection(dc):
"""
Convert dask collection object into mars.core.Object via remote API

Parameters
----------
dc: dask collection
Dask collection object to be converted.

Returns
-------
Object
Mars Object.
"""
if not is_dask_collection(dc):
raise TypeError(f"'{type(dc).__name__}' object is not a valid dask collection")

dc.__dask_graph__().validate()
dsk = optimize(dc)[0].__dask_graph__()

first_key = next(iter(dsk.keys()))
if isinstance(first_key, str):
key = [first_key]
elif isinstance(first_key, tuple):
key = sorted([i for i in dsk.keys() if i[0] == first_key[0]], key=lambda x: x[1])
else:
raise ValueError(
f"Dask collection object seems be broken, with unexpected key type:'{type(first_key).__name__}'")

return reduce(mars_dask_get(dsk, [key]))
Loading