Skip to content

Commit

Permalink
Merge pull request #337 from pyiron/release_openmpi_constraint
Browse files Browse the repository at this point in the history
Release openmpi constraint - Test with openmpi 5.0
  • Loading branch information
jan-janssen authored May 29, 2024
2 parents 75613c1 + 4af4e97 commit 635acdb
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .ci_support/environment-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
dependencies:
- python
- numpy
- openmpi =4.1.6
- openmpi
- cloudpickle =3.0.0
- mpi4py =3.1.6
- tqdm =4.66.4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ jobs:
cat timing.log
python -m unittest tests/benchmark/test_results.py
env:
OMPI_MCA_rmaps_base_oversubscribe: 'yes'
PRTE_MCA_rmaps_default_mapping_policy: ':oversubscribe'
40 changes: 40 additions & 0 deletions .github/workflows/unittest-flux-mpich.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Unittests-flux-mpich

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Extend environment
shell: bash -l {0}
timeout-minutes: 5
run: |
echo -e '- coverage\n- flux-core =0.59.0\n- versioneer =0.28'>> .ci_support/environment-mpich.yml
cat .ci_support/environment-mpich.yml
- uses: conda-incubator/setup-miniconda@v2.2.0
with:
python-version: '3.12'
mamba-version: "*"
channels: conda-forge
miniforge-variant: Mambaforge
channel-priority: strict
auto-update-conda: true
environment-file: .ci_support/environment-mpich.yml
- name: Test
shell: bash -l {0}
timeout-minutes: 5
run: |
pip install . --no-deps --no-build-isolation
python -m unittest discover tests
- name: Test Flux
shell: bash -l {0}
timeout-minutes: 5
run: >
flux start
python -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py;
45 changes: 45 additions & 0 deletions .github/workflows/unittest-flux-openmpi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Unittests-flux-openmpi

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Extend environment
shell: bash -l {0}
timeout-minutes: 5
run: |
echo -e '- coverage\n- flux-core =0.59.0\n- flux-pmix=0.5.0\n- versioneer =0.28' >> .ci_support/environment-openmpi.yml
cat .ci_support/environment-openmpi.yml
- uses: conda-incubator/setup-miniconda@v2.2.0
with:
python-version: '3.12'
mamba-version: "*"
channels: conda-forge
miniforge-variant: Mambaforge
channel-priority: strict
auto-update-conda: true
environment-file: .ci_support/environment-openmpi.yml
- name: Test
shell: bash -l {0}
timeout-minutes: 5
run: |
pip install . --no-deps --no-build-isolation
coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest discover tests
- name: Test Flux with OpenMPI
shell: bash -l {0}
timeout-minutes: 5
run: >
flux start
coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py;
coverage xml
env:
PYMPIPOOL_PMIX: "pmix"
- name: Coveralls
uses: coverallsapp/github-action@v2
58 changes: 0 additions & 58 deletions .github/workflows/unittest-flux.yml

This file was deleted.

3 changes: 2 additions & 1 deletion binder/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ channels:
dependencies:
- python
- numpy
- openmpi =4.1.6
- openmpi
- cloudpickle =3.0.0
- mpi4py =3.1.6
- tqdm =4.66.2
- pyzmq =26.0.0
- flux-core =0.59.0
- flux-pmix =0.5.0
- versioneer =0.28
7 changes: 6 additions & 1 deletion docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ conda install -c conda-forge flux-core flux-sched mpich=>4 pympipool
Flux is not limited to mpich / cray mpi, it can also be installed in compatibility with openmpi or intel mpi using the
openmpi package:
```
conda install -c conda-forge flux-core flux-sched openmpi pympipool
conda install -c conda-forge flux-core flux-sched openmpi=4.1.6 pympipool
```
For the version 5 of openmpi the backend changed to `pmix`, this requires the additional `flux-pmix` plugin:
```
conda install -c conda-forge flux-core flux-sched flux-pmix openmpi>=5 pympipool
```
In addition, the `pmi="pmix"` parameter has to be set for the `pympipool.Executor` to switch to `pmix` as backend.

## Test Flux Framework
To validate the installation of flux and confirm the GPUs are correctly recognized, you can start a flux session on the
Expand Down
2 changes: 1 addition & 1 deletion notebooks/examples.ipynb

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Executor:
function.
init_function (None): optional function to preset arguments for functions which are submitted later
command_line_argument_lst (list): Additional command line arguments for the srun call (SLURM only)
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
Examples:
```
Expand Down Expand Up @@ -85,6 +86,7 @@ def __init__(
block_allocation: bool = True,
init_function: Optional[callable] = None,
command_line_argument_lst: list[str] = [],
pmi: Optional[str] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
):
Expand All @@ -106,6 +108,7 @@ def __new__(
block_allocation: bool = False,
init_function: Optional[callable] = None,
command_line_argument_lst: list[str] = [],
pmi: Optional[str] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
):
Expand Down Expand Up @@ -143,6 +146,7 @@ def __new__(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
command_line_argument_lst (list): Additional command line arguments for the srun call (SLURM only)
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
disable_dependencies (boolean): Disable resolving future objects during the submission.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
Expand All @@ -162,6 +166,7 @@ def __new__(
block_allocation=block_allocation,
init_function=init_function,
command_line_argument_lst=command_line_argument_lst,
pmi=pmi,
refresh_rate=refresh_rate,
)
else:
Expand All @@ -180,4 +185,5 @@ def __new__(
block_allocation=block_allocation,
init_function=init_function,
command_line_argument_lst=command_line_argument_lst,
pmi=pmi,
)
6 changes: 6 additions & 0 deletions pympipool/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
check_oversubscribe,
check_executor,
check_init_function,
check_pmi,
validate_backend,
validate_number_of_cores,
)
Expand Down Expand Up @@ -50,6 +51,7 @@ def create_executor(
block_allocation: bool = False,
init_function: Optional[callable] = None,
command_line_argument_lst: list[str] = [],
pmi: Optional[str] = None,
):
"""
Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor,
Expand Down Expand Up @@ -83,13 +85,15 @@ def create_executor(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
command_line_argument_lst (list): Additional command line arguments for the srun call (SLURM only)
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
"""
max_cores = validate_number_of_cores(max_cores=max_cores, max_workers=max_workers)
check_init_function(block_allocation=block_allocation, init_function=init_function)
backend = validate_backend(
backend=backend, flux_installed=flux_installed, slurm_installed=slurm_installed
)
check_pmi(backend=backend, pmi=pmi)
if backend == "flux":
check_oversubscribe(oversubscribe=oversubscribe)
check_command_line_argument_lst(
Expand All @@ -105,6 +109,7 @@ def create_executor(
cwd=cwd,
executor=executor,
hostname_localhost=hostname_localhost,
pmi=pmi,
)
else:
return PyFluxStepExecutor(
Expand All @@ -115,6 +120,7 @@ def create_executor(
cwd=cwd,
executor=executor,
hostname_localhost=hostname_localhost,
pmi=pmi,
)
elif backend == "slurm":
check_executor(executor=executor)
Expand Down
10 changes: 10 additions & 0 deletions pympipool/scheduler/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class PyFluxExecutor(ExecutorBroker):
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -66,6 +67,7 @@ def __init__(
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
executor: Optional[flux.job.FluxExecutor] = None,
pmi: Optional[str] = None,
hostname_localhost: Optional[bool] = False,
):
super().__init__()
Expand All @@ -85,6 +87,7 @@ def __init__(
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
"cwd": cwd,
"executor": executor,
"pmi": pmi,
},
)
for _ in range(max_workers)
Expand All @@ -106,6 +109,7 @@ class PyFluxStepExecutor(ExecutorSteps):
gpus_per_worker (int): number of GPUs per worker - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -141,6 +145,7 @@ def __init__(
gpus_per_worker: int = 0,
cwd: Optional[str] = None,
executor: Optional[flux.job.FluxExecutor] = None,
pmi: Optional[str] = None,
hostname_localhost: Optional[bool] = False,
):
super().__init__()
Expand All @@ -159,6 +164,7 @@ def __init__(
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
"cwd": cwd,
"executor": executor,
"pmi": pmi,
},
)
)
Expand All @@ -173,6 +179,7 @@ def __init__(
gpus_per_core: int = 0,
oversubscribe: bool = False,
executor: Optional[flux.job.FluxExecutor] = None,
pmi: Optional[str] = None,
):
super().__init__(
cwd=cwd,
Expand All @@ -182,6 +189,7 @@ def __init__(
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._executor = executor
self._pmi = pmi
self._future = None

def bootup(self, command_lst: list[str]):
Expand All @@ -200,6 +208,8 @@ def bootup(self, command_lst: list[str]):
exclusive=False,
)
jobspec.environment = dict(os.environ)
if self._pmi is not None:
jobspec.setattr_shell_option("pmi", self._pmi)
if self._cwd is not None:
jobspec.cwd = self._cwd
self._future = self._executor.submit(jobspec)
Expand Down
11 changes: 10 additions & 1 deletion pympipool/shared/inputcheck.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import inspect
from typing import List
from typing import List, Optional
from concurrent.futures import Executor


Expand Down Expand Up @@ -85,6 +85,15 @@ def validate_backend(
return "mpi"


def check_pmi(backend: str, pmi: Optional[str]):
if backend != "flux" and pmi is not None:
raise ValueError("The pmi parameter is currently only implemented for flux.")
elif backend == "flux" and pmi not in ["pmix", "pmi1", "pmi2", None]:
raise ValueError(
"The pmi parameter supports [pmix, pmi1, pmi2], but not: " + pmi
)


def check_init_function(block_allocation: bool, init_function: callable):
if not block_allocation and init_function is not None:
raise ValueError("")
Expand Down
Loading

0 comments on commit 635acdb

Please sign in to comment.