Skip to content

Commit

Permalink
Ensures parallelism has the ability to garbage collect
Browse files Browse the repository at this point in the history
This deletes unused references in parallelism/dynamism.
That said, the python garbage collector occasionally has to be told what
to do, so the user should be aware.

Fixes #373
  • Loading branch information
elijahbenizzy committed Sep 22, 2023
1 parent 5ab6360 commit 5b60218
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 2 deletions.
11 changes: 10 additions & 1 deletion hamilton/execution/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,21 @@ def base_execute_task(task: TaskImplementation) -> Dict[str, Any]:
if not getattr(node_, "callable_modified", False):
node_._callable = _modify_callable(node_.node_role, node_.callable)
setattr(node_, "callable_modified", True)
return execute_subdag(
out = execute_subdag(
nodes=task.nodes,
inputs=task.dynamic_inputs,
adapter=task.adapters[0], # TODO -- wire through multiple graph adapters
overrides={**task.dynamic_inputs, **task.overrides},
)
# This selection is for GC
# We also need to get the override values
# This way if its overridden we can ensure it gets passed to the right one
final_retval = {
key: value
for key, value in out.items()
if key in task.outputs_to_compute or key in task.overrides
}
return final_retval


class SynchronousLocalTaskExecutor(TaskExecutor):
Expand Down
1 change: 1 addition & 0 deletions hamilton/execution/grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ def create_task_plan(
break
if is_output:
outputs.append(node_.name)

task_spec = TaskSpec(
base_id=node_group.base_id,
spawning_task_base_id=node_group.spawning_task_base_id,
Expand Down
2 changes: 1 addition & 1 deletion hamilton/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = (1, 29, 0)
VERSION = (1, 29, 1, "rc0")
51 changes: 51 additions & 0 deletions tests/execution/test_executors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import time

import numpy as np
import pytest

import hamilton.ad_hoc_utils
from hamilton import base, driver
from hamilton.execution.executors import (
DefaultExecutionManager,
Expand All @@ -17,6 +20,7 @@
NodeGroupPurpose,
TaskImplementation,
)
from hamilton.htypes import Collect, Parallelizable
from tests.resources.dynamic_parallelism import (
inputs_in_collect,
no_parallel,
Expand Down Expand Up @@ -256,3 +260,50 @@ def test_end_to_end_parallelizable_with_overrides_on_collect_node():
.with_remote_executor(SynchronousLocalTaskExecutor())
).build()
assert dr.execute(["collect_plus_one"], overrides={"collect": 100})["collect_plus_one"] == 101


@pytest.mark.parametrize("executor_factory", [SynchronousLocalTaskExecutor])
@pytest.mark.skipif(
os.environ.get("CI") != "true",
reason="This test tests memory usage and takes quite a while to run."
"We don't run it locally as its a low-touch part of the codebase"
"TODO -- actually measure the memory allocated/remaining",
)
def test_sequential_would_use_too_much_memory_no_garbage_collector(executor_factory):
NUM_REPS = 100

def foo() -> Parallelizable[int]:
for i in range(NUM_REPS):
yield i

def large_allocation(foo: int) -> np.array:
size = int(1_073_741_824) # 1 GB in bytes
return np.ones(size)

def compressed(large_allocation: np.array) -> int:
import gc

gc.collect()
return 1

# This is a hack due to python's garbage collector
def gc(compressed: int) -> int:
return compressed

def concatenated(gc: Collect[int]) -> int:
return sum(gc)

mod = hamilton.ad_hoc_utils.create_temporary_module(
foo, large_allocation, compressed, gc, concatenated
)

dr = (
driver.Builder()
.with_modules(mod)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(executor_factory())
.with_grouping_strategy(GroupByRepeatableBlocks())
.build()
)
result = dr.execute(["concatenated"])
assert result["concatenated"] == NUM_REPS

0 comments on commit 5b60218

Please sign in to comment.