Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Annotations #2180

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
55d69bc
Task Annotations
sjperkins Aug 12, 2018
3f79c2d
First stab at task annotations in update_graph
sjperkins Sep 19, 2018
1fb21b3
[skip ci] Use has_key instead of try/except KeyError
sjperkins Sep 20, 2018
0f05d1d
[skip ci] improve tests
sjperkins Sep 20, 2018
d62495f
[skip ci] Add multiple worker test case
sjperkins Sep 20, 2018
0248978
[skip ci] Handle nested tasks, badly
sjperkins Sep 20, 2018
16bfa46
[skip ci] Split out xfail test
sjperkins Sep 20, 2018
6acbcf3
[skip ci] Provide reasons for xfail
sjperkins Sep 20, 2018
083d45f
Shift nested tasks from dumps_task to _deserialize
sjperkins Sep 20, 2018
ce8935a
temporary distributed TaskAnnotation type
sjperkins Sep 24, 2018
4b0a03d
Handle task complexity in kwargs
sjperkins Sep 25, 2018
7732607
Merge branch 'master' into task-annotations
sjperkins Nov 8, 2019
48f99ca
black linting
sjperkins Nov 8, 2019
f661bd9
Fix complex task handling
sjperkins Nov 8, 2019
03396c7
black linting
sjperkins Nov 11, 2019
d015412
Revert convert_kwargs_to_str change
sjperkins Nov 11, 2019
416a910
Merge branch 'master' into task-annotations
sjperkins Nov 11, 2019
505624d
simplify annotation pickle load
sjperkins Nov 11, 2019
0b34dd3
Merge branch 'master' into task-annotations
sjperkins Jun 17, 2020
f8f647d
[skip ci] checkpoint
sjperkins Jun 18, 2020
8ad373a
Merge branch 'master' into task-annotations
sjperkins Jun 18, 2020
dba3053
remove commented out code
sjperkins Jun 19, 2020
2d25feb
Depend on dask task-objects branch
sjperkins Jun 19, 2020
3f5f23f
fix repo
sjperkins Jun 19, 2020
6b9ff7b
Remove pprints and pdb
sjperkins Jun 19, 2020
a6c8293
fix
sjperkins Jun 19, 2020
c9626d2
Merge branch 'master' into task-annotations
sjperkins Jul 8, 2020
ec700a7
Merge branch 'master' into task-annotations
sjperkins Jul 13, 2020
198c29e
Bump delay test to account for startup
sjperkins Jul 14, 2020
31164e8
Apply black formatter
sjperkins Jul 14, 2020
5de3afc
Merge branch 'master' into task-annotations
sjperkins Jul 15, 2020
910b329
Revert "Bump delay test to account for startup"
sjperkins Jul 15, 2020
69be753
Merge branch 'master' into task-annotations
sjperkins Jul 24, 2020
f6efb1b
Merge branch 'master' into task-annotations
sjperkins Aug 1, 2020
a6fb3a3
Merge branch 'master' into task-annotations
sjperkins Aug 18, 2020
a863b50
Merge branch 'master' into task-annotations
sjperkins Aug 30, 2020
c3ad3e3
Merge branch 'master' of github.com:dask/distributed into task-annota…
sjperkins Sep 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion continuous_integration/environment-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ dependencies:
- fsspec
- pip
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/sjperkins/dask@task-objects#egg=dask
- git+https://github.com/joblib/joblib.git
- git+https://github.com/dask/zict
13 changes: 7 additions & 6 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from dask.core import flatten, get_dependencies
from dask.optimization import SubgraphCallable
from dask.compatibility import apply
from dask.task import Task
from dask.utils import ensure_dict, format_bytes, funcname

from tlz import first, groupby, merge, valmap, keymap, partition_all
Expand Down Expand Up @@ -1572,10 +1573,7 @@ def submit(
restrictions = {}
loose_restrictions = []

if kwargs:
dsk = {skey: (apply, func, list(args), kwargs)}
else:
dsk = {skey: (func,) + tuple(args)}
dsk = {skey: Task(func, list(args), kwargs if kwargs else None)}

futures = self._graph_to_futures(
dsk,
Expand Down Expand Up @@ -1737,7 +1735,9 @@ def map(
)

if not kwargs:
dsk = {key: (func,) + args for key, args in zip(keys, zip(*iterables))}
dsk = {
key: Task(func, list(args)) for key, args in zip(keys, zip(*iterables))
}
else:
kwargs2 = {}
dsk = {}
Expand All @@ -1750,7 +1750,7 @@ def map(
kwargs2[k] = v
dsk.update(
{
key: (apply, func, (tuple, list(args)), kwargs2)
key: Task(func, list(args), kwargs2)
for key, args in zip(keys, zip(*iterables))
}
)
Expand Down Expand Up @@ -2576,6 +2576,7 @@ def _graph_to_futures(
for k, v in dsk.items()
if isinstance(v, Future) and k not in keyset
}

if values:
dsk = subs_multiple(dsk, values)

Expand Down
6 changes: 5 additions & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from collections import defaultdict, deque
from collections import defaultdict, deque, namedtuple
from contextlib import suppress
from enum import Enum
from functools import partial
Expand Down Expand Up @@ -40,6 +40,10 @@
)
from . import protocol

# TODO(sjperkins)
# Depend on dask.core definition eventually
TaskAnnotation = namedtuple("TaskAnnotation", ["annotation"])


class Status(Enum):
"""
Expand Down
20 changes: 9 additions & 11 deletions distributed/recreate_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ async def _get_futures_error(self, future):
out = await self.scheduler.cause_of_failure(keys=[f.key for f in futures])
deps, task = out["deps"], out["task"]
if isinstance(task, dict):
function, args, kwargs = _deserialize(**task)
return (function, args, kwargs, deps)
task = _deserialize(**task)
return (task, deps)
else:
function, args, kwargs = _deserialize(task=task)
return (function, args, kwargs, deps)
task = _deserialize(task=task)
return (task, deps)

def get_futures_error(self, future):
"""
Expand All @@ -107,9 +107,7 @@ def get_futures_error(self, future):
-------

Tuple:
- the function that raised an exception
- argument list (a tuple), may include values and keys
- keyword arguments (a dictionary), may include values and keys
- Task that raised the exception
- list of keys that the function requires to be fetched to run

See Also
Expand All @@ -121,12 +119,12 @@ def get_futures_error(self, future):
async def _recreate_error_locally(self, future):
await wait(future)
out = await self._get_futures_error(future)
function, args, kwargs, deps = out
task, deps = out
futures = self.client._graph_to_futures({}, deps)
data = await self.client._gather(futures)
args = pack_data(args, data)
kwargs = pack_data(kwargs, data)
return (function, args, kwargs)
args = pack_data(task.args, data)
kwargs = pack_data(task.kwargs, data)
return (task.function, args, kwargs)

def recreate_error_locally(self, future):
"""
Expand Down
40 changes: 40 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,12 @@ def update_graph(

dependencies = dependencies or {}

priority = priority or {}
restrictions = restrictions or {}
resources = resources or {}
retries = retries or {}
loose_restrictions = loose_restrictions or []

n = 0
while len(tasks) != n: # walk through new tasks, cancel any bad deps
n = len(tasks)
Expand All @@ -1863,6 +1869,39 @@ def update_graph(
self.report({"op": "cancelled-key", "key": k}, client=client)
self.client_releases_keys(keys=[k], client=client)

annotations = {}

# Extract any annotations relating to existing update_graph interfaces
# https://stackoverflow.com/a/20308657/1611416
for k, task in tasks.items():
# This is probably a nested task
if not isinstance(task, dict):
continue

if "annotations" not in task:
continue

annotations[k] = annotation = pickle.loads(task["annotations"])

if "priority" in annotation:
priority[k] = annotation["priority"]

if "worker" in annotation:
worker = annotation["worker"]
if not isinstance(worker, (list, tuple)):
worker = [worker]

restrictions[k] = worker

if annotation.get("allow_other_workers", False):
loose_restrictions.append(k)

if "retries" in annotation:
retries[k] = annotation["retries"]

if "resources" in annotation:
resources[k] = annotation["resources"]

# Remove any self-dependencies (happens on test_publish_bag() and others)
for k, v in dependencies.items():
deps = set(v)
Expand Down Expand Up @@ -2032,6 +2071,7 @@ def update_graph(
self,
client=client,
tasks=tasks,
annotations=annotations,
keys=keys,
restrictions=restrictions or {},
dependencies=dependencies,
Expand Down
63 changes: 55 additions & 8 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,53 @@ async def test_get(c, s, a, b):
assert result == 3


@gen_cluster(client=True, timeout=None)
def test_task_annotations(c, s, a, b):
from dask.task import Task, annotate

# Test priority
dsk = {"x": (annotate(inc, {"priority": 1}), 1)}
result = yield c.get(dsk, "x", sync=False)
assert result == 2

# Test specifying a worker
dsk = {"y": (annotate(inc, {"worker": a.address}), 1)}
result = yield c.get(dsk, "y", sync=False)

assert s.who_has["y"] == set([a.address])
assert result == 2

# Test specifying multiple workers
dsk = {"w": (annotate(inc, {"worker": [a.address, b.address]}), 1)}
result = yield c.get(dsk, "w", sync=False)

assert len(s.who_has["w"].intersection(set([a.address, b.address]))) > 0
assert result == 2

# Test specifying a non-existent worker with loose restrictions
dsk = {
"z": (
annotate(inc, {"worker": "tcp://2.2.2.2/", "allow_other_workers": True}),
1,
)
}
result = yield c.get(dsk, "z", sync=False)

assert len(s.who_has["z"].intersection(set([a.address, b.address]))) > 0
assert result == 2


@gen_cluster(client=True, timeout=None)
def test_nested_task_annotations(c, s, a, b):
from dask.task import Task, annotate

dsk = {"v": (annotate(inc, {"worker": a.address}), (inc, 1))}

result = yield c.get(dsk, "v", sync=False)
assert s.who_has["v"] == set([a.address])
assert result == 3


def test_get_sync(c):
assert c.get({"x": (inc, 1)}, "x") == 2

Expand Down Expand Up @@ -4511,11 +4558,11 @@ async def test_get_future_error_simple(c, s, a, b):
await wait(f)
assert f.status == "error"

function, args, kwargs, deps = await c._get_futures_error(f)
task, deps = await c._get_futures_error(f)
# args contains only solid values, not keys
assert function.__name__ == "div"
assert task.function.__name__ == "div"
with pytest.raises(ZeroDivisionError):
function(*args, **kwargs)
task.function(*task.args, **task.kwargs)


@gen_cluster(client=True)
Expand All @@ -4530,9 +4577,9 @@ async def test_get_futures_error(c, s, a, b):
await wait(f)
assert f.status == "error"

function, args, kwargs, deps = await c._get_futures_error(f)
assert function.__name__ == "div"
assert args == (1, y0.key)
task, deps = await c._get_futures_error(f)
assert task.function.__name__ == "div"
assert task.args == [1, y0.key]


@gen_cluster(client=True)
Expand All @@ -4550,7 +4597,7 @@ async def test_recreate_error_delayed(c, s, a, b):
function, args, kwargs = await c._recreate_error_locally(f)
assert f.status == "error"
assert function.__name__ == "div"
assert args == (1, 0)
assert args == [1, 0]
with pytest.raises(ZeroDivisionError):
function(*args, **kwargs)

Expand All @@ -4569,7 +4616,7 @@ async def test_recreate_error_futures(c, s, a, b):
function, args, kwargs = await c._recreate_error_locally(f)
assert f.status == "error"
assert function.__name__ == "div"
assert args == (1, 0)
assert args == [1, 0]
with pytest.raises(ZeroDivisionError):
function(*args, **kwargs)

Expand Down
28 changes: 28 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,34 @@ def test_dumps_task():
assert cloudpickle.loads(d["args"]) == (1,)
assert set(d) == {"function", "args"}

from dask.task import Task, annotate
from distributed.core import TaskAnnotation as TA
from distributed.protocol.serialize import Serialize

a = {"worker": "alice"}

d = dumps_task((annotate(inc, a), 1))
assert cloudpickle.loads(d["function"])(1) == 2
assert cloudpickle.loads(d["args"]) == [1]
assert cloudpickle.loads(d["annotations"]) == a

d = dumps_task((apply, annotate(f, a), (1,), {"y": 10}))
assert cloudpickle.loads(d["function"])(1, 2) == 3
assert cloudpickle.loads(d["args"]) == (1,)
assert cloudpickle.loads(d["kwargs"]) == {"y": 10}
assert cloudpickle.loads(d["annotations"]) == a

d = dumps_task((inc, (inc, 1)))
assert isinstance(d["task"], Serialize)
assert d["task"].data == Task.from_spec((inc, (inc, 1)))
assert "annotations" not in d

func = (apply, annotate(f, a), (inc, 1), {"y": 10})
d = dumps_task(func)
assert isinstance(d["task"], Serialize)
assert d["task"].data == Task.from_spec(func)
assert cloudpickle.loads(d["annotations"]) == a


@gen_cluster()
async def test_ready_remove_worker(s, a, b):
Expand Down
16 changes: 13 additions & 3 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import dask
from dask import istask
from dask.task import Task, TupleTask, spec_type

# provide format_bytes here for backwards compatibility
from dask.utils import ( # noqa
Expand Down Expand Up @@ -779,12 +780,21 @@ def _maybe_complex(task):


def convert(task, dsk, extra_values):
if type(task) is list:
task_type = spec_type(task)

if task_type is list:
return [convert(v, dsk, extra_values) for v in task]
if type(task) is dict:
if task_type is dict:
return {k: convert(v, dsk, extra_values) for k, v in task.items()}
if istask(task):
if task_type is TupleTask:
return (task[0],) + tuple(convert(x, dsk, extra_values) for x in task[1:])
if task_type is Task:
return Task(
task.function,
convert(task.args, dsk, extra_values),
convert(task.kwargs, dsk, extra_values),
task.annotations,
)
try:
if task in dsk or task in extra_values:
return tokey(task)
Expand Down
Loading