Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample: Using Dask with ESPREsSo #4781

Merged
merged 3 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions samples/high_throughput_with_dask/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Introduction

This sample illustrates how to run a large amount of short Espresso simulations with Dask. Dask is a parallel computing library in Python that enables efficient handling of large datasets and computation tasks.
Note that this sample is not meant to produce meaninful physics results.
The sample consists of the following parts:

- `espersso_dask.py`: contains helper functions that handle running Espesso within Dask and communicating data between Dask and ESPResSo
- `lj_pressure.py`: simulation script which obtains the average pressure for a Lennard-Jones liquid at a given volume fraction
- `run_pv.py`: Uses Dask to run the simulation script at various volume fractions and obtain a pressure vs volume fraction curve.
- `test_espresso_dask.py`: corresponding unit tests, to be run with pytest-3
- `echo.py`: Used to mock an ESPResSo simulation for the unit tests

## How to Use

Note: It is not possible to use ESPResSo with `dask.distributed.LocalCluster`. Instead, follow the procedure described below:

1. Set the `PYTHONPATH` environment variable such that it includes the directory in which `dask_espersso.py` resides:
```bash
export PYTHONPATH=/directory/containing/dask_espresso.py:$PYTHONPATH
```
2. Run the dask scheduler
```bash
dask scheduler &
```
3. Note the address of the scheduler (e.g., `tcp://127.0.0.1:8786`)
4. Launch a few workers inserting the correct scheduler address:
```bash
for i in {1..5}; do dask worker SCHEDULER_ADDRESS; done
```
5. Open `run_pv.py` in an editor and adapt the `PYPRESSO` variable to the correct path to `pypresso`
6. Run `python3 run_pv.py SCHEDULER_ADDRESS` again inserting the scheduler address from above
7. Use `fg` an CTRL-C co shut down the Dask workers and scheduler.

Note that Dask can also be used on compute clusters with HTCondor and SLURM.


## Technical Notes

• Since currently only one ESPResSo instance can be used in a Python script, ESPResSo is run as a separate process. This is accomplished by the `dask_espresso_task` function in `dask_espersso.py`.
• Also, the data transfer between Dask and ESPResSo has to be handled such that it is safe for inter-process communication. This is achieved via the `pickle` and `base64` Python modules. Encoding and decoding functions can be foud in `dask_espresso.py`
• The communication happens via the standard input and output of the simulation script. Therefore, it is essential not to use simple `print()` calls in the simulation script. Instead, use the `logging` module for status messages. These will go to the standard error stream.
• To use this sample for your own simulations:
i
• Use `dask_espresso.py` as is.
• Adapt `run_pv.py` to run simulations with the parameters you need. The keyword arguments passed to `dask_espresso_task()` will be passed as a dictionary to the simulation.
• Use `data = dask_espresso.get_data_from_stdin()` to get the parameters at the beginning of the simulation script.
• Use `print(dask_espresso.encode_transport_data(result))` at the end of your simulation to pass the result to Dask.
• The simulation parameter and result can be any Python object that can be safely pickled and do not require additional context. Basic data types (int, float, string, list, dict) as well as numpy arrays work, whereas objects that require additional context to be valid do not (e.g. file objects and ESPResSo particles).
* To test your simulation script including the transfer of parameters and results outside Dask, you can also use the `dask_espresso.dask_espresso_task.py` function.
56 changes: 56 additions & 0 deletions samples/high_throughput_with_dask/dask_espresso.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pickle
jngrad marked this conversation as resolved.
Show resolved Hide resolved
import base64
import sys
import subprocess
import logging
from dask import delayed


def encode_transport_data(data):
"""
Uses pickle and base64 to convert the provided data to a string which can be passed safely between the dask scheduler, worker and Esprsso
jngrad marked this conversation as resolved.
Show resolved Hide resolved
"""
return base64.b64encode(pickle.dumps(data)).decode("utf-8")


def decode_transport_data(encoded_data):
"""
Converts the transport data back to a Python object
via base64 and pickle
"""
pickle_data = base64.b64decode(encoded_data)
return pickle.loads(pickle_data)


def get_data_from_stdin():
return decode_transport_data(sys.stdin.read())


@delayed
def dask_espresso_task(pypresso, script, **kwargs):
"""
Runs Esprsso asynchronesouly as a Dask task.

pypresso: string
Path to pypresso
script: string
Simulation script to run with pypresso
kwargs:
The keyword arguments are passed encoded and send to the
RudolfWeeber marked this conversation as resolved.
Show resolved Hide resolved
standard input of the simulation script. Use
`data = get_data_from_Stdin()` to obtain it.
"""

logger = logging.getLogger(__name__)
encoded_data = encode_transport_data(kwargs)
espresso = subprocess.Popen([pypresso,
script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
espresso.stdin.write(encoded_data)
out, err = espresso.communicate()
if err != "":
logger.warning("STDERR output from ESPResSo\n", err)
return decode_transport_data(out)
9 changes: 9 additions & 0 deletions samples/high_throughput_with_dask/dump_test_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import pickle
import base64
import sys

base64_data = sys.stdin.read()
pickle_data = base64.b64decode(base64_data)
data = pickle.loads(pickle_data)

print(data)
RudolfWeeber marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions samples/high_throughput_with_dask/echo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import dask_espresso as de
jngrad marked this conversation as resolved.
Show resolved Hide resolved

data = de.get_data_from_stdin()
data.update(processed=True)

print(de.encode_transport_data(data))
jngrad marked this conversation as resolved.
Show resolved Hide resolved
111 changes: 111 additions & 0 deletions samples/high_throughput_with_dask/lj_pressure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@

#
# Copyright (C) 2013-2022 The ESPResSo project
#
# This file is part of ESPResSo.
#
# ESPResSo is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ESPResSo is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

import espressomd
import numpy as np

import dask_espresso as de


import logging
# logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


params = de.get_data_from_stdin()

logger.info("Parameters:", params)
n_particles = int(params["n_particles"])
volume_fraction = float(params["volume_fraction"])
n_steps = int(params["n_steps"])

required_features = ["LENNARD_JONES"]
espressomd.assert_features(required_features)

rng = np.random.default_rng()

# System
#############################################################

# Interaction parameters (Lennard-Jones)
#############################################################

lj_eps = 1.0 # LJ epsilon
lj_sig = 1.0 # particle diameter
lj_cut = lj_sig * 2**(1. / 6.) # cutoff distance

# System parameters
#############################################################
# volume of N spheres with radius r: N * (4/3*pi*r^3)
box_l = (n_particles * 4. / 3. * np.pi * (lj_sig / 2.)**3
/ volume_fraction)**(1. / 3.)

# System
#############################################################
system = espressomd.System(box_l=3 * [box_l])

# Integration parameters
#############################################################
system.time_step = 0.01
system.cell_system.skin = 0.4

# Interaction setup
#############################################################
system.non_bonded_inter[0, 0].lennard_jones.set_params(
epsilon=lj_eps, sigma=lj_sig, cutoff=lj_cut, shift="auto")

# Particle setup
#############################################################

system.part.add(pos=rng.random((n_particles, 3)) * system.box_l)

# Warmup Integration
#############################################################

# warmup
logger.info("Warmup")
system.integrator.set_steepest_descent(
f_max=0, max_displacement=0.01, gamma=1E-3)
system.integrator.run(1)
while np.any(np.abs(system.part.all().f) * system.time_step > .1):
system.integrator.run(10)

system.integrator.set_vv()
system.thermostat.set_langevin(kT=1.0, gamma=1.0, seed=42)

system.integrator.run(1000)
min_skin = 0.2
max_skin = 1.0
# tuning and equilibration
logger.info("Tune skin: {:.3f}".format(system.cell_system.tune_skin(
min_skin=min_skin, max_skin=max_skin, tol=0.05, int_steps=100)))
system.integrator.run(1000)

logger.info("Measuring")

pressures = np.zeros(n_steps)
for i in range(n_steps):
system.integrator.run(10)
pressures[i] = system.analysis.pressure()["total"]

result = {"pressure": np.mean(pressures),
"pressure_std_dev": np.std(pressures)}

print(de.encode_transport_data(result))
37 changes: 37 additions & 0 deletions samples/high_throughput_with_dask/run_pv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import sys
import dask.distributed
import logging
import numpy as np

import dask_espresso

logging.basicConfig(level=logging.WARN)


PYPRESSO = "/data/weeber/es/build/pypresso"
SIM_SCRIPT = "lj_pressure.py"

N_STEPS = int(2E4)
N_PARTICLES = 100
VOLUME_FRACTIONS = np.arange(0.1, 0.52, 0.01)


client = dask.distributed.Client(sys.argv[1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the argument theoretically supposed to be either a Cluster instance or None or is it something different altogether?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it clear that this is a scheuler address that LocalCluster does not work and clusters with remote workers probably will.



futures = []


for volume_fraction in VOLUME_FRACTIONS:
sim_params = {"volume_fraction": volume_fraction,
"n_particles": N_PARTICLES,
"n_steps": N_STEPS}
futures.append(client.compute(dask_espresso.dask_espresso_task(
PYPRESSO, SIM_SCRIPT, **sim_params)))

dask.distributed.progress(futures)

sim_results = client.gather(futures)

for vol_frac, sim_result in zip(VOLUME_FRACTIONS, sim_results):
print(vol_frac, sim_result["pressure"], sim_result["pressure_std_dev"])
17 changes: 17 additions & 0 deletions samples/high_throughput_with_dask/test_dask_esprseso.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dask_espresso as de
import numpy as np


def test_encode_decode():
data = {"a": 1, "b": np.random.random((3, 3))}
encoded = de.encode_transport_data(data)
decoded = de.decode_transport_data(encoded)
for k, v in decoded.items():
assert np.all(data[k]) == np.all(v)
assert list(data.keys()) == list(decoded.keys())


def test_espresso_runner():
data = {"hello": "world"}
result = de.dask_espresso_task("python3", "echo.py", **data).compute()
assert result == {"hello": "world", "processed": True}