From 901d5fa5ff25dfdcd2b97708967edd6b78e88e41 Mon Sep 17 00:00:00 2001 From: Elijah ben Izzy Date: Wed, 17 May 2023 16:39:49 -0400 Subject: [PATCH] Fixes #161 - adds node name to hamilton dask DAG (#163) So dask was taking the names of the wrapped function, rather than the name of the actual node. So to keep things unique, we use the recommended tokenize function to hash the inputs to create a unique string. We use it for both name and dask_key_name because our functions only depend on the input and aren't called multiple times. So we should be good to reuse the same string value in both places. --------- Co-authored-by: Stefan Krawczyk --- hamilton/experimental/h_dask.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/hamilton/experimental/h_dask.py b/hamilton/experimental/h_dask.py index 811c01c44..0a9295810 100644 --- a/hamilton/experimental/h_dask.py +++ b/hamilton/experimental/h_dask.py @@ -6,6 +6,7 @@ import numpy as np import pandas as pd from dask import compute +from dask.base import tokenize from dask.delayed import Delayed, delayed from dask.distributed import Client as DaskClient @@ -113,7 +114,14 @@ def execute_node(self, node: node.Node, kwargs: typing.Dict[str, typing.Any]) -> :param kwargs: the arguments that should be passed to it. :return: returns a dask delayed object. """ - return delayed(node.callable)(**kwargs) + # we want to ensure the name in dask corresponds to the node name, and not the wrapped + # function name that hamilton might have wrapped it with. + hash = tokenize(kwargs) # this is what the dask docs recommend. + name = node.name + hash + dask_key_name = str(node.name) + "_" + hash + return delayed(node.callable, name=name)( + **kwargs, dask_key_name=dask_key_name # this is what shows up in the dask console + ) def build_result(self, **outputs: typing.Dict[str, typing.Any]) -> typing.Any: """Builds the result and brings it back to this running process.