From a2524c50ae97f9648a74bb6468c94a8c163142bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Oct 2024 15:40:06 +0100 Subject: [PATCH 1/2] Split shared cache in backend and frontend --- executorlib/backend/cache_parallel.py | 2 +- executorlib/backend/cache_serial.py | 4 +- executorlib/cache/backend.py | 64 +++++++++++++++++++++++++++ executorlib/shared/cache.py | 63 +------------------------- tests/test_cache_shared.py | 2 +- 5 files changed, 70 insertions(+), 65 deletions(-) create mode 100644 executorlib/cache/backend.py diff --git a/executorlib/backend/cache_parallel.py b/executorlib/backend/cache_parallel.py index 0d80b304..b0d42f98 100644 --- a/executorlib/backend/cache_parallel.py +++ b/executorlib/backend/cache_parallel.py @@ -3,7 +3,7 @@ import cloudpickle -from executorlib.shared.cache import backend_load_file, backend_write_file +from executorlib.cache.backend import backend_load_file, backend_write_file def main() -> None: diff --git a/executorlib/backend/cache_serial.py b/executorlib/backend/cache_serial.py index 55200c8a..daf5a520 100644 --- a/executorlib/backend/cache_serial.py +++ b/executorlib/backend/cache_serial.py @@ -1,6 +1,6 @@ import sys -from executorlib.shared.cache import execute_task_in_file +from executorlib.cache.backend import backend_execute_task_in_file if __name__ == "__main__": - execute_task_in_file(file_name=sys.argv[1]) + backend_execute_task_in_file(file_name=sys.argv[1]) diff --git a/executorlib/cache/backend.py b/executorlib/cache/backend.py new file mode 100644 index 00000000..b6d73f42 --- /dev/null +++ b/executorlib/cache/backend.py @@ -0,0 +1,64 @@ +import os +from typing import Any + +from executorlib.shared.hdf import dump, load +from executorlib.shared.cache import FutureItem + + +def backend_load_file(file_name: str) -> dict: + """ + Load the data from an HDF5 file and convert FutureItem objects to their results. + + Args: + file_name (str): The name of the HDF5 file. + + Returns: + dict: The loaded data from the file. + + """ + apply_dict = load(file_name=file_name) + apply_dict["args"] = [ + arg if not isinstance(arg, FutureItem) else arg.result() + for arg in apply_dict["args"] + ] + apply_dict["kwargs"] = { + key: arg if not isinstance(arg, FutureItem) else arg.result() + for key, arg in apply_dict["kwargs"].items() + } + return apply_dict + + +def backend_write_file(file_name: str, output: Any) -> None: + """ + Write the output to an HDF5 file. + + Args: + file_name (str): The name of the HDF5 file. + output (Any): The output to be written. + + Returns: + None + + """ + file_name_out = os.path.splitext(file_name)[0] + os.rename(file_name, file_name_out + ".h5ready") + dump(file_name=file_name_out + ".h5ready", data_dict={"output": output}) + os.rename(file_name_out + ".h5ready", file_name_out + ".h5out") + + +def backend_execute_task_in_file(file_name: str) -> None: + """ + Execute the task stored in a given HDF5 file. + + Args: + file_name (str): The file name of the HDF5 file as an absolute path. + + Returns: + None + """ + apply_dict = backend_load_file(file_name=file_name) + result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"]) + backend_write_file( + file_name=file_name, + output=result, + ) diff --git a/executorlib/shared/cache.py b/executorlib/shared/cache.py index 13f493b6..155b1449 100644 --- a/executorlib/shared/cache.py +++ b/executorlib/shared/cache.py @@ -4,10 +4,10 @@ import subprocess import sys from concurrent.futures import Future -from typing import Any, Tuple +from typing import Tuple from executorlib.shared.command import get_command_path -from executorlib.shared.hdf import dump, get_output, load +from executorlib.shared.hdf import dump, get_output from executorlib.shared.serialize import serialize_funct_h5 @@ -47,47 +47,6 @@ def done(self) -> bool: return get_output(file_name=self._file_name)[0] -def backend_load_file(file_name: str) -> dict: - """ - Load the data from an HDF5 file and convert FutureItem objects to their results. - - Args: - file_name (str): The name of the HDF5 file. - - Returns: - dict: The loaded data from the file. - - """ - apply_dict = load(file_name=file_name) - apply_dict["args"] = [ - arg if not isinstance(arg, FutureItem) else arg.result() - for arg in apply_dict["args"] - ] - apply_dict["kwargs"] = { - key: arg if not isinstance(arg, FutureItem) else arg.result() - for key, arg in apply_dict["kwargs"].items() - } - return apply_dict - - -def backend_write_file(file_name: str, output: Any) -> None: - """ - Write the output to an HDF5 file. - - Args: - file_name (str): The name of the HDF5 file. - output (Any): The output to be written. - - Returns: - None - - """ - file_name_out = os.path.splitext(file_name)[0] - os.rename(file_name, file_name_out + ".h5ready") - dump(file_name=file_name_out + ".h5ready", data_dict={"output": output}) - os.rename(file_name_out + ".h5ready", file_name_out + ".h5out") - - def execute_in_subprocess( command: list, task_dependent_lst: list = [] ) -> subprocess.Popen: @@ -180,24 +139,6 @@ def execute_tasks_h5( } -def execute_task_in_file(file_name: str) -> None: - """ - Execute the task stored in a given HDF5 file. - - Args: - file_name (str): The file name of the HDF5 file as an absolute path. - - Returns: - None - """ - apply_dict = backend_load_file(file_name=file_name) - result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"]) - backend_write_file( - file_name=file_name, - output=result, - ) - - def _get_execute_command(file_name: str, cores: int = 1) -> list: """ Get command to call backend as a list of two strings diff --git a/tests/test_cache_shared.py b/tests/test_cache_shared.py index ccc16b49..58ad66d2 100644 --- a/tests/test_cache_shared.py +++ b/tests/test_cache_shared.py @@ -6,10 +6,10 @@ try: from executorlib.shared.cache import ( - FutureItem, execute_task_in_file, _check_task_output, ) + from executorlib.cache.backend import FutureItem from executorlib.shared.hdf import dump from executorlib.shared.serialize import serialize_funct_h5 From bc953fd65892c9349817dd88c028938746c50de8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 27 Oct 2024 14:40:26 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/cache/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/cache/backend.py b/executorlib/cache/backend.py index b6d73f42..0cc420d0 100644 --- a/executorlib/cache/backend.py +++ b/executorlib/cache/backend.py @@ -1,8 +1,8 @@ import os from typing import Any -from executorlib.shared.hdf import dump, load from executorlib.shared.cache import FutureItem +from executorlib.shared.hdf import dump, load def backend_load_file(file_name: str) -> dict: