Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1594 singularity #392

Merged
merged 2 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 98 additions & 94 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ trimesh = "^3.22.0"
python-dotenv = "^1.0.0"
geopandas = "^0.13.2"
Rtree = "^1.0.1"
spython = "^0.3.0"

[tool.poetry.dev-dependencies]
pytest = "^7.2.0"
Expand Down
96 changes: 96 additions & 0 deletions src/luna/common/runners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import spython.main

import docker


class DockerRunner:
def __init__(self, image, command, volumes_map, num_cores, max_heap_size):
self._image = image
self._command = command
self._volumes = {}
self._num_cores = num_cores
self._java_options = f"-Xmx{max_heap_size}"
for k, v in volumes_map.items():
self._volumes[k] = {"bind": v, "mode": "rw"}

def run(self):
client = docker.from_env()
container = client.containers.run(
volumes=self._volumes,
nano_cpus=int(self._num_cores * 1e9),
image=self._image,
command=self._command,
environment={"_JAVA_OPTIONS": self._java_options},
detach=False,
stream=True,
)
return container


class DockerRunnerBuilder:
def __init__(self):
self._instance = None

def __call__(self, image, command, volumes_map, num_cores, max_heap_size, **_ignored):
if not self._instance:
self._instance = DockerRunner(image, command, volumes_map, num_cores, max_heap_size)
return self._instance


class SingularityRunner:
def __init__(self, image, command, volumes_map, num_cores, use_gpu, max_heap_size):
self._image = image
self._command = command
self._num_cores = num_cores
self._use_gpu = use_gpu
self._volumes = []
self._java_options = f"-XX:ActiveProcessorCount={num_cores} -Xmx{max_heap_size}"
for k, v in volumes_map.items():
self._volumes.append(f"{k}:{v}")

def run(self):
executor = spython.main.Client.execute(
image=self._image,
command=self._command,
bind=self._volumes,
nv=self._use_gpu,
options=["--env", f"_JAVA_OPTIONS={self._java_options}"],
stream=True,
)
return executor


class SingularityRunnerBuilder:
def __init__(self):
self._instance = None

def __call__(self, image, command, volumes_map, num_cores, use_gpu, max_heap_size, **_ignored):
if not self._instance:
self._instance = SingularityRunner(
image, command, volumes_map, num_cores, use_gpu, max_heap_size
)
return self._instance


class RunnerFactory:
def __init__(self):
self._builders = {}

def register_builder(self, key, builder):
self._builders[key] = builder

def create(self, key, **kwargs):
builder = self._builders.get(key)
if not builder:
raise ValueError(key)
return builder(**kwargs)


class RunnerProvider(RunnerFactory):
def get(self, runner_type, **kwargs):
return self.create(runner_type, **kwargs)


runner_provider = RunnerProvider()
runner_provider.register_builder("DOCKER", DockerRunnerBuilder())
runner_provider.register_builder("SINGULARITY", SingularityRunnerBuilder())
128 changes: 88 additions & 40 deletions src/luna/pathology/cli/run_stardist_cell_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pandas as pd
from loguru import logger

import docker
from luna.common.runners import runner_provider
from luna.common.utils import get_config, local_cache_urlpath, save_metadata, timed


Expand All @@ -19,6 +19,9 @@ def stardist_simple(
output_urlpath: str = ".",
debug_opts: str = "",
num_cores: int = 1,
image: str = "mskmind/qupath-stardist:0.4.3",
use_singularity: bool = False,
max_heap_size: str = "64G",
storage_options: dict = {},
output_storage_options: dict = {},
local_config: str = "",
Expand All @@ -32,8 +35,12 @@ def stardist_simple(
image_type (str): qupath image type (BRIGHTFIELD_H_DAB)
output_urlpath (str): output url/path
debug_opts (str): debug options passed as arguments to groovy script
image (str): docker/singularity image
use_singularity (bool): use singularity instead of docker
max_heap_size (str): maximum heap size to pass to java options
storage_options (dict): storage options to pass to reading functions
output_storage_options (dict): storage options to pass to writing functions
local_config (str): local config yaml file

Returns:
pd.DataFrame: metadata about function call
Expand All @@ -56,6 +63,9 @@ def stardist_simple(
config["output_urlpath"],
config["debug_opts"],
config["num_cores"],
config["image"],
config["use_singularity"],
config['max_heap_size'],
config["storage_options"],
config["output_storage_options"],
)
Expand All @@ -66,7 +76,7 @@ def stardist_simple(
logger.info("generated cell data:")
logger.info(df)

output_geojson_file = Path(output_path) / f"cell_detections.geojson"
output_geojson_file = Path(output_path) / "cell_detections.geojson"

properties = {
"cell_objects": str(output_header_file),
Expand All @@ -91,6 +101,9 @@ def stardist_simple_main(
output_urlpath: str,
debug_opts: str,
num_cores: int,
image: str,
use_singularity: bool,
max_heap_size: str,
storage_options: dict,
output_storage_options: dict,
) -> pd.DataFrame:
Expand All @@ -103,6 +116,9 @@ def stardist_simple_main(
image_type (str): qupath image type (BRIGHTFIELD_H_DAB)
output_urlpath (str): output url/path
debug_opts (str): debug options passed as arguments to groovy script
image (str): docker/singularity image
use_singularity (bool): use singularity instead of docker
max_heap_size (str): maximum heap size to pass to java options
storage_options (dict): storage options to pass to reading functions
output_storage_options (dict): storage options to pass to writing functions

Expand All @@ -112,31 +128,39 @@ def stardist_simple_main(
fs, slide_path = fsspec.core.url_to_fs(slide_urlpath, **storage_options)
ofs, output_path = fsspec.core.url_to_fs(output_urlpath, **output_storage_options)

if ofs.protocol == 'file' and not ofs.exists(output_path):
ofs.mkdir(output_path)

runner_type = "DOCKER"
if use_singularity:
runner_type = "SINGULARITY"

slide_filename = Path(slide_path).name
docker_image = "mskmind/qupath-stardist:current"
command = f"QuPath script --image /inputs/{slide_filename} --args [cellSize={cell_expansion_size},imageType={image_type},{debug_opts}] /scripts/stardist_simple.groovy"
logger.info(f"Launching QuPath via {docker_image} ...")
logger.info(f"Launching QuPath via {runner_type}:{image} ...")
logger.info(
f"\tvolumes={slide_urlpath}:'/inputs/{slide_filename}', {slide_path}:'/output_dir'"
)
logger.info(f"\tnano_cpus={int(num_cores * 1e9)}")
logger.info(f"\timage='{docker_image}'")
logger.info(f"\timage='{image}'")
logger.info(f"\tcommand={command}")

client = docker.from_env()
container = client.containers.run(
volumes={
slide_path: {"bind": f"/inputs/{slide_filename}", "mode": "ro"},
output_path: {"bind": "/output_dir", "mode": "rw"},
},
nano_cpus=int(num_cores * 1e9),
image=docker_image,
command=command,
detach=True,
)
volumes_map = {
slide_path: f"/inputs/{slide_filename}",
output_path: "/output_dir",
}

for line in container.logs(stream=True):
print(line.decode(), end="")
runner_config = {
"image": image,
"command": command,
"num_cores": num_cores,
"max_heap_size": max_heap_size,
"volumes_map": volumes_map,
}
runner = runner_provider.get(runner_type, **runner_config)
executor = runner.run()
for line in executor:
print(line)

stardist_output = Path(output_path) / "cell_detections.tsv"

Expand All @@ -158,15 +182,21 @@ def stardist_cell_lymphocyte(
output_urlpath: str = ".",
num_cores: int = 1,
use_gpu: bool = False,
image: str = "mskmind/qupath-stardist:0.4.3",
use_singularity: bool = False,
max_heap_size: str = "64G",
storage_options: dict = {},
output_storage_options: dict = {},
):
"""Run stardist using qupath CLI

Args:
input_slide_image (str): path to slide image (virtual slide formats compatible with openslide, .svs, .tif, .scn, ...)
num_cores (int): Number of cores to use for CPU parallelization
slide_urlpath (str): url/path to slide image (virtual slide formats compatible with openslide, .svs, .tif, .scn, ...)
output_urlpath (str): output url/path
num_cores (int): Number of cores to use for CPU parallelization
use_gpu (bool): use GPU
use_singularity (bool): use singularity instead of docker
max_heap_size (str): maximum heap size to pass to java options
storage_options (dict): storage options to pass to reading functions
output_storage_options (dict): storage options to pass to writing functions

Expand All @@ -189,18 +219,20 @@ def stardist_cell_lymphocyte(
config["output_urlpath"],
config["num_cores"],
config["use_gpu"],
config["image"],
config["use_singularity"],
config["max_heap_size"],
config["storage_options"],
config["output_storage_options"],
)


with fs.open(output_header_file, "wb") as of:
df.to_parquet(of)

logger.info("generated cell data:")
logger.info(df)

output_geojson_file = Path(output_path) / f"cell_detections.geojson"
output_geojson_file = Path(output_path) / "cell_detections.geojson"

properties = {
"cell_objects": str(output_header_file),
Expand All @@ -223,15 +255,21 @@ def stardist_cell_lymphocyte_main(
output_urlpath: str,
num_cores: int,
use_gpu: bool = False,
image: str = "mskmind/qupath-stardist:0.4.3",
use_singularity: bool = False,
max_heap_size: str = "64G",
storage_options: dict = {},
output_storage_options: dict = {},
) -> pd.DataFrame:
"""Run stardist using qupath CLI

Args:
input_slide_image (str): path to slide image (virtual slide formats compatible with openslide, .svs, .tif, .scn, ...)
num_cores (int): Number of cores to use for CPU parallelization
slide_urlpath (str): url/path to slide image (virtual slide formats compatible with openslide, .svs, .tif, .scn, ...)
output_urlpath (str): output url/path
num_cores (int): Number of cores to use for CPU parallelization
use_gpu (bool): use GPU
use_singularity (bool): use singularity instead of docker
max_heap_size (str): maximum heap size to pass to java options
storage_options (dict): storage options to pass to reading functions

Returns:
Expand All @@ -241,35 +279,45 @@ def stardist_cell_lymphocyte_main(

ofs, output_path = fsspec.core.url_to_fs(output_urlpath, **output_storage_options)

if ofs.protocol == 'file' and not ofs.exists(output_path):
ofs.mkdir(output_path)

qupath_cmd = "QuPath-cpu"
if use_gpu:
qupath_cmd = "QuPath-gpu"

runner_type = "DOCKER"
if use_singularity:
runner_type = "SINGULARITY"


slide_filename = Path(slide_path).name
docker_image = "mskmind/qupath-tensorflow:latest"
command = f"{qupath_cmd} script --image /inputs/{slide_filename} /scripts/stardist_nuclei_and_lymphocytes.groovy"
logger.info("Launching docker container:")
logger.info(f"Launching {runner_type} container:")
logger.info(
f"\tvolumes={slide_path}:'/inputs/{slide_filename}', {output_path}:'/output_dir'"
)
logger.info(f"\tnano_cpus={int(num_cores * 1e9)}")
logger.info(f"\timage='{docker_image}'")
logger.info(f"\timage='{image}'")
logger.info(f"\tcommand={command}")

client = docker.from_env()
container = client.containers.run(
volumes={
slide_path: {"bind": f"/inputs/{slide_filename}", "mode": "ro"},
output_path: {"bind": "/output_dir", "mode": "rw"},
},
nano_cpus=int(num_cores * 1e9),
image=docker_image,
command=command,
detach=True,
)
volumes_map = {
slide_path: f"/inputs/{slide_filename}",
output_path: "/output_dir",
}

for line in container.logs(stream=True):
print(line.decode(), end="")
runner_config = {
"image": image,
"command": command,
"num_cores": num_cores,
"max_heap_size": max_heap_size,
"volumes_map": volumes_map,
"use_gpu": use_gpu,
}
runner = runner_provider.get(runner_type, **runner_config)
executor = runner.run()
for line in executor:
print(line)

stardist_output = Path(output_path) / "cell_detections.tsv"

Expand Down
Loading