From 294965d67dae1470cfe7edcbb35297ba73b55207 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 15 Nov 2024 22:30:56 +0100 Subject: [PATCH] Change Backend Names (#500) * Change Backend Names * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update __init__.py * fix flux tests * fix flux submission test * fix input checks * fix notebooks * Add more tests --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- executorlib/__init__.py | 2 +- executorlib/cache/executor.py | 4 ++-- executorlib/interactive/executor.py | 14 +++++++++----- executorlib/standalone/inputcheck.py | 4 ++-- notebooks/examples.ipynb | 22 +++++++++++----------- tests/test_cache_executor_pysqa_flux.py | 2 +- tests/test_dependencies_executor.py | 4 ++++ tests/test_executor_backend_flux.py | 10 +++++----- tests/test_shared_input_check.py | 2 +- 9 files changed, 36 insertions(+), 28 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 3337653a..16358677 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -179,7 +179,7 @@ def __new__( resource_dict.update( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) - if "pysqa_" in backend and not plot_dependency_graph: + if "_submission" in backend and not plot_dependency_graph: from executorlib.cache.executor import create_file_executor return create_file_executor( diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index a2de5bf1..ad817915 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -81,7 +81,7 @@ def __init__( def create_file_executor( max_workers: int = 1, - backend: str = "pysqa_flux", + backend: str = "flux_submission", max_cores: int = 1, cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, @@ -113,6 +113,6 @@ def create_file_executor( cache_directory=cache_directory, resource_dict=resource_dict, pysqa_config_directory=pysqa_config_directory, - backend=backend.split("pysqa_")[-1], + backend=backend.split("_submission")[0], disable_dependencies=disable_dependencies, ) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 067dec50..8cfaa2c0 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -198,13 +198,13 @@ def create_executor( init_function (None): optional function to preset arguments for functions which are submitted later """ check_init_function(block_allocation=block_allocation, init_function=init_function) - if flux_executor is not None and backend != "flux": - backend = "flux" + if flux_executor is not None and backend != "flux_allocation": + backend = "flux_allocation" check_pmi(backend=backend, pmi=flux_executor_pmi_mode) cores_per_worker = resource_dict["cores"] resource_dict["cache_directory"] = cache_directory resource_dict["hostname_localhost"] = hostname_localhost - if backend == "flux": + if backend == "flux_allocation": check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) check_command_line_argument_lst( command_line_argument_lst=resource_dict["slurm_cmd_args"] @@ -233,7 +233,7 @@ def create_executor( executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) - elif backend == "slurm": + elif backend == "slurm_allocation": check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) if block_allocation: @@ -255,7 +255,7 @@ def create_executor( executor_kwargs=resource_dict, spawner=SrunSpawner, ) - else: # backend="local" + elif backend == "local": check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) @@ -285,3 +285,7 @@ def create_executor( executor_kwargs=resource_dict, spawner=MpiExecSpawner, ) + else: + raise ValueError( + "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local." + ) diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index a78e2e8d..5abea87b 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -117,9 +117,9 @@ def check_pmi(backend: str, pmi: Optional[str]) -> None: """ Check if pmi is valid for the selected backend and raise a ValueError if it is not. """ - if backend != "flux" and pmi is not None: + if backend != "flux_allocation" and pmi is not None: raise ValueError("The pmi parameter is currently only implemented for flux.") - elif backend == "flux" and pmi not in ["pmix", "pmi1", "pmi2", None]: + elif backend == "flux_allocation" and pmi not in ["pmix", "pmi1", "pmi2", None]: raise ValueError( "The pmi parameter supports [pmix, pmi1, pmi2], but not: " + pmi ) diff --git a/notebooks/examples.ipynb b/notebooks/examples.ipynb index 70c90a86..1546dcb8 100644 --- a/notebooks/examples.ipynb +++ b/notebooks/examples.ipynb @@ -69,7 +69,7 @@ "source": [ "from executorlib import Executor\n", "\n", - "with Executor(max_cores=1, backend=\"flux\") as exe:\n", + "with Executor(max_cores=1, backend=\"flux_allocation\") as exe:\n", " future = exe.submit(sum, [1, 1])\n", " print(future.result())" ] @@ -103,7 +103,7 @@ " return sum(*args)\n", "\n", "\n", - "with Executor(max_cores=2, backend=\"flux\") as exe:\n", + "with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n", " fs_1 = exe.submit(calc, [2, 1])\n", " fs_2 = exe.submit(calc, [2, 2])\n", " fs_3 = exe.submit(calc, [2, 3])\n", @@ -159,7 +159,7 @@ " return sum(*args)\n", "\n", "\n", - "with Executor(max_cores=2, backend=\"flux\") as exe:\n", + "with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n", " print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))" ] }, @@ -205,7 +205,7 @@ " with Executor(\n", " # Resource definition on the executor level\n", " max_workers=2, # total number of cores available to the Executor\n", - " backend=\"flux\", # optional in case the backend is not recognized\n", + " backend=\"flux_allocation\", # optional in case the backend is not recognized\n", " # Optional resource definition\n", " resource_dict={\n", " \"cores\": 1,\n", @@ -277,7 +277,7 @@ " # Resource definition on the executor level\n", " max_cores=2, # total number of cores available to the Executor\n", " block_allocation=True, # reuse python processes\n", - " backend=\"flux\",\n", + " backend=\"flux_allocation\",\n", ") as exe:\n", " future_obj = exe.submit(\n", " calc_function,\n", @@ -332,7 +332,7 @@ "with Executor(\n", " max_cores=1,\n", " init_function=init_function,\n", - " backend=\"flux\",\n", + " backend=\"flux_allocation\",\n", " block_allocation=True,\n", ") as exe:\n", " fs = exe.submit(calc, 2, j=5)\n", @@ -462,7 +462,7 @@ "with Executor(\n", " max_cores=2,\n", " resource_dict={\"cores\": 2},\n", - " backend=\"flux\",\n", + " backend=\"flux_allocation\",\n", " flux_executor_pmi_mode=\"pmix\",\n", ") as exe:\n", " fs = exe.submit(calc, 3)\n", @@ -519,7 +519,7 @@ "with Executor(\n", " max_workers=2, \n", " gpus_per_worker=1,\n", - " backend=\"flux\",\n", + " backend=\"flux_allocation\",\n", ") as exe:\n", " fs_1 = exe.submit(get_available_gpus)\n", " fs_2 = exe.submit(get_available_gpus)\n", @@ -627,7 +627,7 @@ " return parameter_a + parameter_b\n", "\n", "\n", - "with Executor(max_cores=2, backend=\"flux\") as exe:\n", + "with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n", " future_1 = exe.submit(\n", " calc_function,\n", " 1,\n", @@ -672,7 +672,7 @@ "```\n", "from executorlib import Executor\n", "\n", - "with Executor(max_cores=1, backend=\"slurm\") as exe:\n", + "with Executor(max_cores=1, backend=\"slurm_allocation\") as exe:\n", " future = exe.submit(sum, [1,1])\n", " print(future.result())\n", "```" @@ -683,7 +683,7 @@ "id": "ae8dd860-f90f-47b4-b3e5-664f5c949350", "metadata": {}, "source": [ - "The `backend=\"slurm\"` parameter is optional as `executorlib` automatically recognizes if [flux framework](https://flux-framework.org)\n", + "The `backend=\"slurm_allocation\"` parameter is optional as `executorlib` automatically recognizes if [flux framework](https://flux-framework.org)\n", "or SLURM are available. \n", "\n", "In addition, the SLURM backend introduces the `command_line_argument_lst=[]` parameter, which allows the user to provide\n", diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index 995e876a..827fd455 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -33,7 +33,7 @@ def mpi_funct(i): class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): with Executor( - backend="pysqa_flux", + backend="flux_submission", resource_dict={"cores": 2, "cwd": "cache"}, block_allocation=False, cache_directory="cache", diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index cfcf0733..d3ddeb9a 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -73,6 +73,10 @@ def test_executor_dependency_plot(self): self.assertEqual(len(nodes), 5) self.assertEqual(len(edges), 4) + def test_create_executor_error(self): + with self.assertRaises(ValueError): + create_executor(backend="toast", resource_dict={"cores": 1}) + def test_dependency_steps(self): cloudpickle_register(ind=1) fs1 = Future() diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 6fcf3575..8ab11569 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -47,7 +47,7 @@ def test_flux_executor_serial(self): with Executor( max_cores=2, flux_executor=self.executor, - backend="flux", + backend="flux_allocation", block_allocation=True, ) as exe: fs_1 = exe.submit(calc, 1) @@ -62,7 +62,7 @@ def test_flux_executor_threads(self): max_cores=1, resource_dict={"threads_per_core": 2}, flux_executor=self.executor, - backend="flux", + backend="flux_allocation", block_allocation=True, ) as exe: fs_1 = exe.submit(calc, 1) @@ -77,7 +77,7 @@ def test_flux_executor_parallel(self): max_cores=2, resource_dict={"cores": 2}, flux_executor=self.executor, - backend="flux", + backend="flux_allocation", block_allocation=True, flux_executor_pmi_mode=pmi, ) as exe: @@ -90,7 +90,7 @@ def test_single_task(self): max_cores=2, resource_dict={"cores": 2}, flux_executor=self.executor, - backend="flux", + backend="flux_allocation", block_allocation=True, flux_executor_pmi_mode=pmi, ) as p: @@ -106,7 +106,7 @@ def test_internal_memory(self): resource_dict={"cores": 1}, init_function=set_global, flux_executor=self.executor, - backend="flux", + backend="flux_allocation", block_allocation=True, ) as p: f = p.submit(get_global) diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index b1b18b60..7ba84f71 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -66,7 +66,7 @@ def test_check_pmi(self): with self.assertRaises(ValueError): check_pmi(backend="test", pmi="test") with self.assertRaises(ValueError): - check_pmi(backend="flux", pmi="test") + check_pmi(backend="flux_allocation", pmi="test") def test_check_nested_flux_executor(self): with self.assertRaises(ValueError):