Skip to content

Commit

Permalink
Merge pull request #933 from haddocking/930-optimize-job-preparation-…
Browse files Browse the repository at this point in the history
…in-rigidbody

Refactor `rigidbody` module to allow the preparation of `cns_input` to be done in parallel
  • Loading branch information
rvhonorato authored Jul 15, 2024
2 parents 919dddb + c14f80b commit 27cf05d
Show file tree
Hide file tree
Showing 9 changed files with 19,218 additions and 49 deletions.
118 changes: 118 additions & 0 deletions integration_tests/test_rigidbody.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import shutil
import tempfile
from pathlib import Path

import pytest

from haddock.libs.libontology import Format, PDBFile, Persistent
from haddock.modules.sampling.rigidbody import (
DEFAULT_CONFIG as DEFAULT_RIGIDBODY_CONFIG,
)
from haddock.modules.sampling.rigidbody import HaddockModule as RigidbodyModule
from tests import golden_data


@pytest.fixture
def rigidbody_module():
with tempfile.TemporaryDirectory() as tmpdir:
rigidbody = RigidbodyModule(
order=0, path=Path(tmpdir), initial_params=DEFAULT_RIGIDBODY_CONFIG
)
yield rigidbody


class MockPreviousIO:
def __init__(self, path):
self.path = path

def retrieve_models(self, crossdock: bool = False):
shutil.copy(
Path(golden_data, "e2aP_1F3G_haddock.pdb"),
Path(".", "e2aP_1F3G_haddock.pdb"),
)
shutil.copy(
Path(golden_data, "e2aP_1F3G_haddock.psf"),
Path(".", "e2aP_1F3G_haddock.psf"),
)
shutil.copy(
Path(golden_data, "hpr_ensemble_1_haddock.pdb"),
Path(".", "hpr_ensemble_1_haddock.pdb"),
)
shutil.copy(
Path(golden_data, "hpr_ensemble_1_haddock.psf"),
Path(".", "hpr_ensemble_1_haddock.psf"),
)
model_list = [
[
PDBFile(
file_name="e2aP_1F3G_haddock.pdb",
path=".",
topology=[
Persistent(
file_name="e2aP_1F3G_haddock.psf",
path=".",
file_type=Format.TOPOLOGY,
)
],
),
PDBFile(
file_name="hpr_ensemble_1_haddock.pdb",
path=".",
topology=[
Persistent(
file_name="hpr_ensemble_1_haddock.psf",
path=".",
file_type=Format.TOPOLOGY,
)
],
),
]
]

return model_list

def output(self):
return None


def test_rigidbody(rigidbody_module):

sampling = 5
rigidbody_module.previous_io = MockPreviousIO(path=rigidbody_module.path)
rigidbody_module.params["sampling"] = sampling
rigidbody_module.params["cmrest"] = True
rigidbody_module.params["mol_fix_origin_1"] = True
rigidbody_module.params["mol_fix_origin_2"] = False

rigidbody_module.run()

for i in range(1, sampling + 1):
assert Path(rigidbody_module.path, f"rigidbody_{i}.pdb").exists()
assert Path(rigidbody_module.path, f"rigidbody_{i}.out.gz").exists()
assert Path(rigidbody_module.path, f"rigidbody_{i}.inp").exists()
assert not Path(rigidbody_module.path, f"rigidbody_{i}.seed").exists()

assert Path(rigidbody_module.path, f"rigidbody_{i}.pdb").stat().st_size > 0
assert Path(rigidbody_module.path, f"rigidbody_{i}.out.gz").stat().st_size > 0
assert Path(rigidbody_module.path, f"rigidbody_{i}.inp").stat().st_size > 0


def test_rigidbody_less_io(rigidbody_module):

sampling = 5
rigidbody_module.previous_io = MockPreviousIO(path=rigidbody_module.path)
rigidbody_module.params["sampling"] = sampling
rigidbody_module.params["cmrest"] = True
rigidbody_module.params["mol_fix_origin_1"] = True
rigidbody_module.params["mol_fix_origin_2"] = False
rigidbody_module.params["less_io"] = True

rigidbody_module.run()

for i in range(1, sampling + 1):
assert Path(rigidbody_module.path, f"rigidbody_{i}.pdb").exists()
assert not Path(rigidbody_module.path, f"rigidbody_{i}.out.gz").exists()
assert not Path(rigidbody_module.path, f"rigidbody_{i}.inp").exists()
assert not Path(rigidbody_module.path, f"rigidbody_{i}.seed").exists()

assert Path(rigidbody_module.path, f"rigidbody_{i}.pdb").stat().st_size > 0
14 changes: 14 additions & 0 deletions src/haddock/libs/libparallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ def get_index_list(nmodels, ncores):
return index_list


class GenericTask:
"""Generic task to be executed."""

def __init__(self, function, *args, **kwargs):
if not callable(function):
raise TypeError("The 'function' argument must be callable")
self.function = function
self.args = args
self.kwargs = kwargs

def run(self):
return self.function(*self.args, **self.kwargs)


class Worker(Process):
"""Work on tasks."""

Expand Down
158 changes: 120 additions & 38 deletions src/haddock/modules/sampling/rigidbody/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
sure to sample enough the possible interaction space.
"""

from datetime import datetime
from pathlib import Path

from haddock.core.typing import FilePath
from haddock.core.typing import FilePath, Sequence, Union
from haddock.gear.haddockmodel import HaddockModel
from haddock.libs.libcns import prepare_cns_input
from haddock.libs.libontology import PDBFile
from haddock.libs.libparallel import GenericTask, Scheduler
from haddock.libs.libsubprocess import CNSJob
from haddock.modules import get_engine
from haddock.modules.base_cns_module import BaseCNSModule
Expand All @@ -60,6 +62,111 @@ def confirm_installation(cls) -> None:
"""Confirm module is installed."""
return

def make_cns_jobs(
self,
inp_list: Sequence[
tuple[list[PDBFile], Union[Path, str], Union[str, None], int]
],
) -> list[CNSJob]:
jobs = []
for idx, e in enumerate(inp_list, start=1):
combination, inp_input, ambig_fname, seed = e

log_fname = f"rigidbody_{idx}.out"
output_pdb_fname = f"rigidbody_{idx}.pdb"

# Create a model for the expected output
model = PDBFile(output_pdb_fname, path=".", restr_fname=ambig_fname)
model.topology = [e.topology for e in combination]
model.seed = seed # type: ignore
self.output_models.append(model)

job = CNSJob(inp_input, log_fname, envvars=self.envvars)
jobs.append(job)
return jobs

def prepare_cns_input_sequential(
self,
models_to_dock: list[list[PDBFile]],
sampling_factor: int,
ambig_fnames: Union[list, None],
) -> list[tuple[list[PDBFile], Union[Path, str], Union[str, None], int]]:
_l = []
idx = 1
for combination in models_to_dock:
for _ in range(sampling_factor):
# assign ambig_fname
if ambig_fnames:
ambig_fname = ambig_fnames[idx - 1]
else:
ambig_fname = self.params["ambig_fname"]
# prepare cns input
seed = self.params["iniseed"] + idx
rigidbody_input = prepare_cns_input(
idx,
combination,
self.path,
self.recipe_str,
self.params,
"rigidbody",
ambig_fname=ambig_fname,
default_params_path=self.toppar_path,
native_segid=True,
less_io=self.params["less_io"],
seed=seed,
)
_l.append((combination, rigidbody_input, ambig_fname, seed))

idx += 1
return _l

def prepare_cns_input_parallel(
self,
models_to_dock: list[list[PDBFile]],
sampling_factor: int,
ambig_fnames: Union[list, None],
) -> list[tuple[list[PDBFile], Union[Path, str], Union[str, None], int]]:
prepare_tasks = []
_l = []
idx = 1
for combination in models_to_dock:
for _ in range(sampling_factor):
ambig_fname = (
ambig_fnames[idx - 1]
if ambig_fnames
else self.params["ambig_fname"]
)
seed = self.params["iniseed"] + idx
task = GenericTask(
function=prepare_cns_input,
model_number=idx,
input_element=combination,
step_path=self.path,
recipe_str=self.recipe_str,
defaults=self.params,
identifier="rigidbody",
ambig_fname=ambig_fname,
native_segid=True,
default_params_path=self.toppar_path,
less_io=self.params["less_io"],
seed=seed,
)

prepare_tasks.append(task)
_l.append((combination, task, ambig_fname, seed))
idx += 1
Engine = get_engine(self.params["mode"], self.params)
prepare_engine = Engine(prepare_tasks)
prepare_engine.run()

# Replace the task with the result of the task
l = []
assert isinstance(prepare_engine, Scheduler)
for element, task_result in zip(_l, prepare_engine.results):
l.append((element[0], task_result, element[2], element[3]))

return l

def _run(self) -> None:
"""Execute module."""
# Pool of jobs to be executed by the CNS engine
Expand Down Expand Up @@ -98,48 +205,23 @@ def _run(self) -> None:
else:
ambig_fnames = None

# Prepare the jobs
idx = 1
start = datetime.now()
self.output_models: list[PDBFile] = []
self.log("Preparing jobs...")
for combination in models_to_dock:
for _i in range(sampling_factor):
# assign ambig_fname
if ambig_fnames:
ambig_fname = ambig_fnames[idx - 1]
else:
ambig_fname = self.params["ambig_fname"]

seed = self.params["iniseed"] * idx
# prepare cns input
rigidbody_input = prepare_cns_input(
idx,
combination,
self.path,
self.recipe_str,
self.params,
"rigidbody",
ambig_fname=ambig_fname,
default_params_path=self.toppar_path,
native_segid=True,
less_io=self.params["less_io"],
seed=seed,
)

log_fname = f"rigidbody_{idx}.out"
output_pdb_fname = f"rigidbody_{idx}.pdb"

# Create a model for the expected output
model = PDBFile(output_pdb_fname, path=".", restr_fname=ambig_fname)
model.topology = [e.topology for e in combination] # type: ignore
model.seed = seed
if self.params["mode"] == "batch":
cns_input = self.prepare_cns_input_sequential(
models_to_dock, sampling_factor, ambig_fnames # type: ignore
)

self.output_models.append(model)
else:
cns_input = self.prepare_cns_input_parallel(
models_to_dock, sampling_factor, ambig_fnames # type: ignore
)
end = datetime.now()

job = CNSJob(rigidbody_input, log_fname, envvars=self.envvars)
jobs.append(job)
self.log(f"Preparation took {(end - start).total_seconds()} seconds")

idx += 1
jobs = self.make_cns_jobs(cns_input)

# Run CNS Jobs
self.log(f"Running CNS Jobs n={len(jobs)}")
Expand Down
Loading

0 comments on commit 27cf05d

Please sign in to comment.