Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add caching for hamilton #11

Closed
HamiltonRepoMigrationBot opened this issue Feb 26, 2023 · 4 comments
Closed

Add caching for hamilton #11

HamiltonRepoMigrationBot opened this issue Feb 26, 2023 · 4 comments
Labels
enhancement New feature or request migrated-from-old-repo Migrated from old repository product idea

Comments

@HamiltonRepoMigrationBot
Copy link
Collaborator

Issue by elijahbenizzy
Monday Oct 18, 2021 at 17:54 GMT
Originally opened as stitchfix/hamilton#17


The problem

We want to enable caching of functions and their downstream results.

Say we want to alter a function and rerun the entire DAG. The function that we want to alter runs late enough that we'd be redoing a significant amount of computation. While iterating could often be solved by executing individual nodes, its completely reasonable to iterate on the entire DAG.

In our internal use of Hamilton, we actually have a decorator called @cache that runs entirely separate from Hamilton -- this allows us to cache the results of individual functions. This decorator uses (a) the code a function runs and (b) the hash of the parameters. That said, its not foolproof -- changes in external libraries referenced within functions can get ignored, its not DAG-aware (it doesn't care about downstream functions you might also not want to rerun), and it depends on hashability of parameters.

I envision this as useful for:

  1. Rerunning/iterating on a DAG locally
  2. Running expensive DAGs in production ETLs that all use the same cache but change minimal parts

Some options

  1. Automatically cache functions, have a clear_cache or use_cache in the execute function
  2. Use the @cache decorator we have internally
  3. Manage the cache externally -- pass in as an override to the driver's execute function. Then have a method on the driver to manipulate the cache as needed.

I'm partial to (1) although we need to make it visible to the user and easy to override. E.G. to mark things as changed. We could also have decorators that say dont_cache if needed.

Probably a few other things we can do -- welcome feedback! Might want to think about making it pluggable -- saving to disk is nice, but saving it to a backing store could be even nicer. Shouldn't get locked in.

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by judahrand
Monday Nov 01, 2021 at 09:20 GMT


Relying on the hash-ability of parameters can be dealt with in the way that Joblib does it - by using pickle.dumps to turn the parameter into bytes and then hashing the result. As of Pickle Protocol 5 (Python >=3.8 or with a backport) this is actually very fast even with large Numpy arrays or Pandas DataFrames.

Regarding hashing, I've personally also found that for large data inputs xxhash.xxh3_128 can be a useful speed up - though it does introduce an additional dependency.

The external dependencies of a function is still a problem, however.

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by elijahbenizzy
Friday Feb 04, 2022 at 18:24 GMT


@judahrand yep! Nice callout. Sorry for the delay. I think a big question is what we want to cache... If we cache at the configuration side (E.G. all sync nodes to a function), then we can do a downstream trace to determine what to rerun. Similar to the approach spark takes. The nice thing about this is we don't need to worry about hashing -- as we're hashing configuration items and not datasets.

TBD on what the user interface would look like for this -- I think an external caching layer could work nicely, as we expose this capability in function_graph to some extent.

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by skrawcz
Saturday Mar 19, 2022 at 22:14 GMT


I think caching should be controlled by the driver + adapter -- it's most useful when iterating/building a DAG. Once someone has this running in production, we already don't recompute by design, so not sure how much value it would be in production, other than for "resumeability" of workflows in case of error (e.g. like https://docs.ray.io/en/latest/workflows/concepts.html).

My motivating use case -- I'm iterating locally, and would like the earlier nodes to be cached, since I'm working on downstream ones. Setting that via the driver makes sense to me.

@zilto
Copy link
Collaborator

zilto commented Mar 3, 2024

now supported via hamilton.plugins.h_diskcache.DiskCacheAdapter

@zilto zilto closed this as completed Mar 3, 2024
elijahbenizzy added a commit that referenced this issue Sep 9, 2024
# This is the 1st commit message:

Update graph_functions.py

Describes what to do in `graph_functions.py`
# This is the commit message #2:

Adds comments to lifecycle base
# This is the commit message #3:

Update h_ray.py with comments for ray tracking compatibility
# This is the commit message #4:

Replicate previous error

# This is the commit message #5:

Inline function, unsure if catching errors and exceptions to be handadled differently

# This is the commit message #6:

BaseDoRemoteExecute has the added Callable function that snadwisched lifecycle hooks

# This is the commit message #7:

method fails, says AssertionError about ray.remote decorator

# This is the commit message #8:

simple script for now to check telemetry, execution yield the ray.remote AssertionError

# This is the commit message #9:

passing pointer through and arguments to lifecycle wrapper into ray.remote

# This is the commit message #10:

post-execute hook for node not called

# This is the commit message #11:

finally executed only when exception occurs, hamilton tracker not executed

# This is the commit message #12:

atexit.register does not work, node keeps running inui

# This is the commit message #13:

added stop() method, but doesn't get called

# This is the commit message #14:

Ray telemtry works for single node, problem with connected nodes

# This is the commit message #15:

Ray telemtry works for single node, problem with connected nodes

# This is the commit message #16:

Ray telemtry works for single node, problem with connected nodes

# This is the commit message #17:

Fixes ray object dereferencing

Ray does not resolve nested arguments:
https://docs.ray.io/en/latest/ray-core/objects.html#passing-object-arguments

So one option is to make them all top level:

- one way to do that is to make the other arguments not clash with any
possible user parameters -- hence the `__` prefix. This is what I did.
- another way would be in the ray adapter, wrap the incoming function,
and explicitly do a ray.get() on any ray object references in the
kwargs arguments. i.e. keep the nested structure, but when the ray
task starts way for all inputs... not sure which is best, but this
now works correctly.

# This is the commit message #18:

ray works checkpoint, pre-commit fixed

# This is the commit message #19:

fixed graph level telemtry proposal

# This is the commit message #20:

pinned ruff

# This is the commit message #21:

Correct output, added option to start ray cluster

# This is the commit message #22:

Unit test mimicks the DoNodeExecute unit test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request migrated-from-old-repo Migrated from old repository product idea
Projects
None yet
Development

No branches or pull requests

3 participants