Skip to content

Commit

Permalink
Fixes #161 - adds node name to hamilton dask DAG (#163)
Browse files Browse the repository at this point in the history
So dask was taking the names of the wrapped function, rather than
the name of the actual node. So to keep things unique, we use the
recommended tokenize function to hash the inputs to create
a unique string. We use it for both name and dask_key_name because
our functions only depend on the input and aren't called multiple times.
So we should be good to reuse the same string value in both places.

---------

Co-authored-by: Stefan Krawczyk <stefan@dagworks.io>
  • Loading branch information
elijahbenizzy and skrawcz authored May 17, 2023
1 parent 415f09e commit 901d5fa
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion hamilton/experimental/h_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import pandas as pd
from dask import compute
from dask.base import tokenize
from dask.delayed import Delayed, delayed
from dask.distributed import Client as DaskClient

Expand Down Expand Up @@ -113,7 +114,14 @@ def execute_node(self, node: node.Node, kwargs: typing.Dict[str, typing.Any]) ->
:param kwargs: the arguments that should be passed to it.
:return: returns a dask delayed object.
"""
return delayed(node.callable)(**kwargs)
# we want to ensure the name in dask corresponds to the node name, and not the wrapped
# function name that hamilton might have wrapped it with.
hash = tokenize(kwargs) # this is what the dask docs recommend.
name = node.name + hash
dask_key_name = str(node.name) + "_" + hash
return delayed(node.callable, name=name)(
**kwargs, dask_key_name=dask_key_name # this is what shows up in the dask console
)

def build_result(self, **outputs: typing.Dict[str, typing.Any]) -> typing.Any:
"""Builds the result and brings it back to this running process.
Expand Down

0 comments on commit 901d5fa

Please sign in to comment.