Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to disable_dependencies for cache #477

Merged
merged 2 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def __new__(
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
)
elif not disable_dependencies:
_check_pysqa_config_directory(pysqa_config_directory=pysqa_config_directory)
Expand Down
5 changes: 5 additions & 0 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
terminate_function: Optional[callable] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
disable_dependencies: bool = False,
):
"""
Initialize the FileExecutor.
Expand All @@ -45,6 +46,7 @@ def __init__(
terminate_function (callable, optional): The function to terminate the tasks.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str, optional): name of the backend used to spawn tasks.
disable_dependencies (boolean): Disable resolving future objects during the submission.
"""
super().__init__()
default_resource_dict = {
Expand All @@ -71,6 +73,7 @@ def __init__(
"terminate_function": terminate_function,
"pysqa_config_directory": pysqa_config_directory,
"backend": backend,
"disable_dependencies": disable_dependencies,
},
)
)
Expand All @@ -89,6 +92,7 @@ def create_file_executor(
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
disable_dependencies: bool = False,
):
if cache_directory is None:
cache_directory = "executorlib_cache"
Expand All @@ -110,4 +114,5 @@ def create_file_executor(
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
disable_dependencies=disable_dependencies,
)
15 changes: 12 additions & 3 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def execute_tasks_h5(
terminate_function: Optional[callable] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
disable_dependencies: bool = False,
) -> None:
"""
Execute tasks stored in a queue using HDF5 files.
Expand Down Expand Up @@ -111,14 +112,22 @@ def execute_tasks_h5(
if task_key + ".h5out" not in os.listdir(cache_directory):
file_name = os.path.join(cache_directory, task_key + ".h5in")
dump(file_name=file_name, data_dict=data_dict)
if not disable_dependencies:
task_dependent_lst = [
process_dict[k] for k in future_wait_key_lst
]
else:
if len(future_wait_key_lst) > 0:
raise ValueError(
"Future objects are not supported as input if disable_dependencies=True."
)
task_dependent_lst = []
process_dict[task_key] = execute_function(
command=_get_execute_command(
file_name=file_name,
cores=task_resource_dict["cores"],
),
task_dependent_lst=[
process_dict[k] for k in future_wait_key_lst
],
task_dependent_lst=task_dependent_lst,
resource_dict=task_resource_dict,
config_directory=pysqa_config_directory,
backend=backend,
Expand Down
7 changes: 7 additions & 0 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ def test_executor_dependence_mixed(self):
self.assertEqual(fs2.result(), 4)
self.assertTrue(fs2.done())

def test_executor_dependence_error(self):
with self.assertRaises(ValueError):
with FileExecutor(
execute_function=execute_in_subprocess, disable_dependencies=True
) as exe:
exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2))

def test_executor_working_directory(self):
cwd = os.path.join(os.path.dirname(__file__), "executables")
with FileExecutor(
Expand Down
Loading