Skip to content

Commit

Permalink
Merge pull request #392 from msk-mind/1594-singularity
Browse files Browse the repository at this point in the history
1594 singularity
  • Loading branch information
raylim authored Aug 29, 2023
2 parents 2866ceb + 266ddc5 commit 7970242
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 149 deletions.
192 changes: 98 additions & 94 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ trimesh = "^3.22.0"
python-dotenv = "^1.0.0"
geopandas = "^0.13.2"
Rtree = "^1.0.1"
spython = "^0.3.0"

[tool.poetry.dev-dependencies]
pytest = "^7.2.0"
Expand Down
96 changes: 96 additions & 0 deletions src/luna/common/runners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import spython.main

import docker


class DockerRunner:
def __init__(self, image, command, volumes_map, num_cores, max_heap_size):
self._image = image
self._command = command
self._volumes = {}
self._num_cores = num_cores
self._java_options = f"-Xmx{max_heap_size}"
for k, v in volumes_map.items():
self._volumes[k] = {"bind": v, "mode": "rw"}

def run(self):
client = docker.from_env()
container = client.containers.run(
volumes=self._volumes,
nano_cpus=int(self._num_cores * 1e9),
image=self._image,
command=self._command,
environment={"_JAVA_OPTIONS": self._java_options},
detach=False,
stream=True,
)
return container


class DockerRunnerBuilder:
def __init__(self):
self._instance = None

def __call__(self, image, command, volumes_map, num_cores, max_heap_size, **_ignored):
if not self._instance:
self._instance = DockerRunner(image, command, volumes_map, num_cores, max_heap_size)
return self._instance


class SingularityRunner:
def __init__(self, image, command, volumes_map, num_cores, use_gpu, max_heap_size):
self._image = image
self._command = command
self._num_cores = num_cores
self._use_gpu = use_gpu
self._volumes = []
self._java_options = f"-XX:ActiveProcessorCount={num_cores} -Xmx{max_heap_size}"
for k, v in volumes_map.items():
self._volumes.append(f"{k}:{v}")

def run(self):
executor = spython.main.Client.execute(
image=self._image,
command=self._command,
bind=self._volumes,
nv=self._use_gpu,
options=["--env", f"_JAVA_OPTIONS={self._java_options}"],
stream=True,
)
return executor


class SingularityRunnerBuilder:
def __init__(self):
self._instance = None

def __call__(self, image, command, volumes_map, num_cores, use_gpu, max_heap_size, **_ignored):
if not self._instance:
self._instance = SingularityRunner(
image, command, volumes_map, num_cores, use_gpu, max_heap_size
)
return self._instance


class RunnerFactory:
def __init__(self):
self._builders = {}

def register_builder(self, key, builder):
self._builders[key] = builder

def create(self, key, **kwargs):
builder = self._builders.get(key)
if not builder:
raise ValueError(key)
return builder(**kwargs)


class RunnerProvider(RunnerFactory):
def get(self, runner_type, **kwargs):
return self.create(runner_type, **kwargs)


runner_provider = RunnerProvider()
runner_provider.register_builder("DOCKER", DockerRunnerBuilder())
runner_provider.register_builder("SINGULARITY", SingularityRunnerBuilder())
128 changes: 88 additions & 40 deletions src/luna/pathology/cli/run_stardist_cell_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pandas as pd
from loguru import logger

import docker
from luna.common.runners import runner_provider
from luna.common.utils import get_config, local_cache_urlpath, save_metadata, timed


Expand All @@ -19,6 +19,9 @@ def stardist_simple(
output_urlpath: str = ".",
debug_opts: str = "",
num_cores: int = 1,
image: str = "mskmind/qupath-stardist:0.4.3",
use_singularity: bool = False,
max_heap_size: str = "64G",
storage_options: dict = {},
output_storage_options: dict = {},
local_config: str = "",
Expand All @@ -32,8 +35,12 @@ def stardist_simple(
image_type (str): qupath image type (BRIGHTFIELD_H_DAB)
output_urlpath (str): output url/path
debug_opts (str): debug options passed as arguments to groovy script
image (str): docker/singularity image
use_singularity (bool): use singularity instead of docker
max_heap_size (str): maximum heap size to pass to java options
storage_options (dict): storage options to pass to reading functions
output_storage_options (dict): storage options to pass to writing functions
local_config (str): local config yaml file
Returns:
pd.DataFrame: metadata about function call
Expand All @@ -56,6 +63,9 @@ def stardist_simple(
config["output_urlpath"],
config["debug_opts"],
config["num_cores"],
config["image"],
config["use_singularity"],
config['max_heap_size'],
config["storage_options"],
config["output_storage_options"],
)
Expand All @@ -66,7 +76,7 @@ def stardist_simple(
logger.info("generated cell data:")
logger.info(df)

output_geojson_file = Path(output_path) / f"cell_detections.geojson"
output_geojson_file = Path(output_path) / "cell_detections.geojson"

properties = {
"cell_objects": str(output_header_file),
Expand All @@ -91,6 +101,9 @@ def stardist_simple_main(
output_urlpath: str,
debug_opts: str,
num_cores: int,
image: str,
use_singularity: bool,
max_heap_size: str,
storage_options: dict,
output_storage_options: dict,
) -> pd.DataFrame:
Expand All @@ -103,6 +116,9 @@ def stardist_simple_main(
image_type (str): qupath image type (BRIGHTFIELD_H_DAB)
output_urlpath (str): output url/path
debug_opts (str): debug options passed as arguments to groovy script
image (str): docker/singularity image
use_singularity (bool): use singularity instead of docker
max_heap_size (str): maximum heap size to pass to java options
storage_options (dict): storage options to pass to reading functions
output_storage_options (dict): storage options to pass to writing functions
Expand All @@ -112,31 +128,39 @@ def stardist_simple_main(
fs, slide_path = fsspec.core.url_to_fs(slide_urlpath, **storage_options)
ofs, output_path = fsspec.core.url_to_fs(output_urlpath, **output_storage_options)

if ofs.protocol == 'file' and not ofs.exists(output_path):
ofs.mkdir(output_path)

runner_type = "DOCKER"
if use_singularity:
runner_type = "SINGULARITY"

slide_filename = Path(slide_path).name
docker_image = "mskmind/qupath-stardist:current"
command = f"QuPath script --image /inputs/{slide_filename} --args [cellSize={cell_expansion_size},imageType={image_type},{debug_opts}] /scripts/stardist_simple.groovy"
logger.info(f"Launching QuPath via {docker_image} ...")
logger.info(f"Launching QuPath via {runner_type}:{image} ...")
logger.info(
f"\tvolumes={slide_urlpath}:'/inputs/{slide_filename}', {slide_path}:'/output_dir'"
)
logger.info(f"\tnano_cpus={int(num_cores * 1e9)}")
logger.info(f"\timage='{docker_image}'")
logger.info(f"\timage='{image}'")
logger.info(f"\tcommand={command}")

client = docker.from_env()
container = client.containers.run(
volumes={
slide_path: {"bind": f"/inputs/{slide_filename}", "mode": "ro"},
output_path: {"bind": "/output_dir", "mode": "rw"},
},
nano_cpus=int(num_cores * 1e9),
image=docker_image,
command=command,
detach=True,
)
volumes_map = {
slide_path: f"/inputs/{slide_filename}",
output_path: "/output_dir",
}

for line in container.logs(stream=True):
print(line.decode(), end="")
runner_config = {
"image": image,
"command": command,
"num_cores": num_cores,
"max_heap_size": max_heap_size,
"volumes_map": volumes_map,
}
runner = runner_provider.get(runner_type, **runner_config)
executor = runner.run()
for line in executor:
print(line)

stardist_output = Path(output_path) / "cell_detections.tsv"

Expand All @@ -158,15 +182,21 @@ def stardist_cell_lymphocyte(
output_urlpath: str = ".",
num_cores: int = 1,
use_gpu: bool = False,
image: str = "mskmind/qupath-stardist:0.4.3",
use_singularity: bool = False,
max_heap_size: str = "64G",
storage_options: dict = {},
output_storage_options: dict = {},
):
"""Run stardist using qupath CLI
Args:
input_slide_image (str): path to slide image (virtual slide formats compatible with openslide, .svs, .tif, .scn, ...)
num_cores (int): Number of cores to use for CPU parallelization
slide_urlpath (str): url/path to slide image (virtual slide formats compatible with openslide, .svs, .tif, .scn, ...)
output_urlpath (str): output url/path
num_cores (int): Number of cores to use for CPU parallelization
use_gpu (bool): use GPU
use_singularity (bool): use singularity instead of docker
max_heap_size (str): maximum heap size to pass to java options
storage_options (dict): storage options to pass to reading functions
output_storage_options (dict): storage options to pass to writing functions
Expand All @@ -189,18 +219,20 @@ def stardist_cell_lymphocyte(
config["output_urlpath"],
config["num_cores"],
config["use_gpu"],
config["image"],
config["use_singularity"],
config["max_heap_size"],
config["storage_options"],
config["output_storage_options"],
)


with fs.open(output_header_file, "wb") as of:
df.to_parquet(of)

logger.info("generated cell data:")
logger.info(df)

output_geojson_file = Path(output_path) / f"cell_detections.geojson"
output_geojson_file = Path(output_path) / "cell_detections.geojson"

properties = {
"cell_objects": str(output_header_file),
Expand All @@ -223,15 +255,21 @@ def stardist_cell_lymphocyte_main(
output_urlpath: str,
num_cores: int,
use_gpu: bool = False,
image: str = "mskmind/qupath-stardist:0.4.3",
use_singularity: bool = False,
max_heap_size: str = "64G",
storage_options: dict = {},
output_storage_options: dict = {},
) -> pd.DataFrame:
"""Run stardist using qupath CLI
Args:
input_slide_image (str): path to slide image (virtual slide formats compatible with openslide, .svs, .tif, .scn, ...)
num_cores (int): Number of cores to use for CPU parallelization
slide_urlpath (str): url/path to slide image (virtual slide formats compatible with openslide, .svs, .tif, .scn, ...)
output_urlpath (str): output url/path
num_cores (int): Number of cores to use for CPU parallelization
use_gpu (bool): use GPU
use_singularity (bool): use singularity instead of docker
max_heap_size (str): maximum heap size to pass to java options
storage_options (dict): storage options to pass to reading functions
Returns:
Expand All @@ -241,35 +279,45 @@ def stardist_cell_lymphocyte_main(

ofs, output_path = fsspec.core.url_to_fs(output_urlpath, **output_storage_options)

if ofs.protocol == 'file' and not ofs.exists(output_path):
ofs.mkdir(output_path)

qupath_cmd = "QuPath-cpu"
if use_gpu:
qupath_cmd = "QuPath-gpu"

runner_type = "DOCKER"
if use_singularity:
runner_type = "SINGULARITY"


slide_filename = Path(slide_path).name
docker_image = "mskmind/qupath-tensorflow:latest"
command = f"{qupath_cmd} script --image /inputs/{slide_filename} /scripts/stardist_nuclei_and_lymphocytes.groovy"
logger.info("Launching docker container:")
logger.info(f"Launching {runner_type} container:")
logger.info(
f"\tvolumes={slide_path}:'/inputs/{slide_filename}', {output_path}:'/output_dir'"
)
logger.info(f"\tnano_cpus={int(num_cores * 1e9)}")
logger.info(f"\timage='{docker_image}'")
logger.info(f"\timage='{image}'")
logger.info(f"\tcommand={command}")

client = docker.from_env()
container = client.containers.run(
volumes={
slide_path: {"bind": f"/inputs/{slide_filename}", "mode": "ro"},
output_path: {"bind": "/output_dir", "mode": "rw"},
},
nano_cpus=int(num_cores * 1e9),
image=docker_image,
command=command,
detach=True,
)
volumes_map = {
slide_path: f"/inputs/{slide_filename}",
output_path: "/output_dir",
}

for line in container.logs(stream=True):
print(line.decode(), end="")
runner_config = {
"image": image,
"command": command,
"num_cores": num_cores,
"max_heap_size": max_heap_size,
"volumes_map": volumes_map,
"use_gpu": use_gpu,
}
runner = runner_provider.get(runner_type, **runner_config)
executor = runner.run()
for line in executor:
print(line)

stardist_output = Path(output_path) / "cell_detections.tsv"

Expand Down
Loading

0 comments on commit 7970242

Please sign in to comment.