Skip to content

Commit

Permalink
Dev/main (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
rhoadesScholar committed Mar 11, 2024
2 parents 990bc16 + 408a74e commit b3cd009
Show file tree
Hide file tree
Showing 25 changed files with 173 additions and 159 deletions.
16 changes: 7 additions & 9 deletions dacapo/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def apply(
), "Either validation_dataset and criterion, or iteration must be provided."

# retrieving run
logger.info("Loading run %s", run_name)
print("Loading run %s", run_name)
config_store = create_config_store()
run_config = config_store.retrieve_run_config(run_name)
run = Run(run_config)
Expand All @@ -70,7 +70,7 @@ def apply(
# load weights
if iteration is None:
iteration = weights_store.retrieve_best(run_name, validation_dataset, criterion) # type: ignore
logger.info("Loading weights for iteration %i", iteration)
print("Loading weights for iteration %i", iteration)
weights_store.retrieve_weights(run_name, iteration)

if parameters is None:
Expand All @@ -89,9 +89,7 @@ def apply(
raise ValueError(
"validation_dataset must be a dataset name or a Dataset object, or parameters must be provided explicitly."
)
logger.info(
"Finding best parameters for validation dataset %s", _validation_dataset
)
print("Finding best parameters for validation dataset %s", _validation_dataset)
parameters = run.task.evaluator.get_overall_best_parameters(
_validation_dataset, criterion
)
Expand Down Expand Up @@ -151,7 +149,7 @@ def apply(
output_container, f"output_{run_name}_{iteration}_{parameters}"
)

logger.info(
print(
"Applying best results from run %s at iteration %i to dataset %s",
run.name,
iteration,
Expand Down Expand Up @@ -186,7 +184,7 @@ def apply_run(
"""Apply the model to a dataset. If roi is None, the whole input dataset is used. Assumes model is already loaded."""

# render prediction dataset
logger.info("Predicting on dataset %s", prediction_array_identifier)
print("Predicting on dataset %s", prediction_array_identifier)
predict(
run.name,
iteration,
Expand All @@ -200,10 +198,10 @@ def apply_run(
)

# post-process the output
logger.info("Post-processing output to dataset %s", output_array_identifier)
print("Post-processing output to dataset %s", output_array_identifier)
post_processor = run.task.post_processor
post_processor.set_prediction(prediction_array_identifier)
post_processor.process(parameters, output_array_identifier, num_workers=num_workers)

logger.info("Done")
print("Done")
return
2 changes: 1 addition & 1 deletion dacapo/blockwise/argmax_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def start_worker(
client = daisy.Client()

while True:
logger.info("getting block")
print("getting block")
with client.acquire_block() as block:
if block is None:
break
Expand Down
18 changes: 8 additions & 10 deletions dacapo/blockwise/empanada_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def orthoplane_inference(engine, volume):
# report instances per class
for tracker in trackers:
class_id = tracker.class_id
logger.info(
print(
f"Class {class_id}, axis {axis_name}, has {len(tracker.instances.keys())} instances"
)

Expand Down Expand Up @@ -153,7 +153,7 @@ def start_postprocess_worker(*args):
min_extent=min_extent,
dtype=engine.dtype,
):
logger.info(f"Yielding {class_name} volume of shape {vol.shape}")
print(f"Yielding {class_name} volume of shape {vol.shape}")
yield vol, class_name, tracker

def start_consensus_worker(trackers_dict):
Expand All @@ -166,7 +166,7 @@ def start_consensus_worker(trackers_dict):
min_extent=min_extent,
dtype=engine.dtype,
):
logger.info(f"Yielding {class_name} volume of shape {vol.shape}")
print(f"Yielding {class_name} volume of shape {vol.shape}")
yield vol, class_name, tracker

# verify that the image doesn't have extraneous channel dimensions
Expand All @@ -182,7 +182,7 @@ def start_consensus_worker(trackers_dict):
else:
raise Exception(f"Image volume must be 3D, got image of shape {shape}")

logger.info(
print(
f"Got 4D image of shape {shape}, extracted single channel of size {image.shape}"
)

Expand Down Expand Up @@ -210,7 +210,7 @@ def stack_postprocessing(

# create the final instance segmentations
for class_id, class_name in class_names.items():
logger.info(f"Creating stack segmentation for class {class_name}...")
print(f"Creating stack segmentation for class {class_name}...")

class_tracker = get_axis_trackers_by_class(trackers, class_id)[0]
shape3d = class_tracker.shape3d
Expand All @@ -224,7 +224,7 @@ def stack_postprocessing(
filters.remove_small_objects(stack_tracker, min_size=min_size)
filters.remove_pancakes(stack_tracker, min_span=min_extent)

logger.info(f"Total {class_name} objects {len(stack_tracker.instances.keys())}")
print(f"Total {class_name} objects {len(stack_tracker.instances.keys())}")

# decode and fill the instances
stack_vol = np.zeros(shape3d, dtype=dtype)
Expand Down Expand Up @@ -254,7 +254,7 @@ def tracker_consensus(
# create the final instance segmentations
for class_id, class_name in class_names.items():
# get the relevant trackers for the class_label
logger.info(f"Creating consensus segmentation for class {class_name}...")
print(f"Creating consensus segmentation for class {class_name}...")

class_trackers = get_axis_trackers_by_class(trackers, class_id)
shape3d = class_trackers[0].shape3d
Expand All @@ -271,9 +271,7 @@ def tracker_consensus(
class_trackers, pixel_vote_thr
)

logger.info(
f"Total {class_name} objects {len(consensus_tracker.instances.keys())}"
)
print(f"Total {class_name} objects {len(consensus_tracker.instances.keys())}")

# decode and fill the instances
consensus_vol = np.zeros(shape3d, dtype=dtype)
Expand Down
12 changes: 2 additions & 10 deletions dacapo/blockwise/predict_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ def start_worker(
input_size = input_voxel_size * input_shape
output_size = output_voxel_size * model.compute_output_shape(input_shape)[1]

logger.info(
"Predicting with input size %s, output size %s", input_size, output_size
)
print("Predicting with input size %s, output size %s", input_size, output_size)

# create gunpowder keys

Expand Down Expand Up @@ -167,12 +165,6 @@ def start_worker(
output_size,
voxel_size=output_voxel_size,
)
# # use daisy requests to run pipeline
# pipeline += gp.DaisyRequestBlocks(
# reference=request,
# roi_map={raw: "read_roi", prediction: "write_roi"},
# num_workers=1,
# )

daisy_client = daisy.Client()

Expand All @@ -181,7 +173,7 @@ def start_worker(
if block is None:
return

logger.info("Processing block %s", block)
print("Processing block %s", block)

chunk_request = request.copy()
chunk_request[raw].roi = block.read_roi
Expand Down
10 changes: 9 additions & 1 deletion dacapo/blockwise/relabel_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from glob import glob
import os
import sys
from time import sleep
import daisy
from dacapo.compute_context import create_compute_context
from dacapo.store.array_store import LocalArrayIdentifier
Expand Down Expand Up @@ -54,7 +55,14 @@ def start_worker(
if block is None:
break

relabel_in_block(array_out, nodes, components, block)
try:
relabel_in_block(array_out, nodes, components, block)
except OSError as e:
logging.error(
f"Failed to relabel block {block.write_roi}: {e}. Trying again."
)
sleep(1)
relabel_in_block(array_out, nodes, components, block)


def relabel_in_block(array_out, old_values, new_values, block):
Expand Down
135 changes: 75 additions & 60 deletions dacapo/blockwise/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
import shutil
import tempfile
import time
import daisy
Expand All @@ -7,6 +8,7 @@
import yaml

from dacapo.blockwise import DaCapoBlockwiseTask
from dacapo import Options
import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,7 +83,7 @@ def run_blockwise(
**kwargs,
)

logger.info("Running blockwise with worker_file: ", worker_file)
print("Running blockwise with worker_file: ", worker_file)
success = daisy.run_blockwise([task])
return success

Expand All @@ -96,7 +98,6 @@ def segment_blockwise(
max_retries: int = 2,
timeout=None,
upstream_tasks=None,
tmp_prefix="tmp",
*args,
**kwargs,
):
Expand Down Expand Up @@ -133,6 +134,14 @@ def segment_blockwise(
(either due to failed post check or application crashes or network
failure)
timeout (``int``):
The maximum time in seconds to wait for a worker to complete a task.
upstream_tasks (``List``):
List of upstream tasks.
*args:
Additional positional arguments to pass to ``worker_function``.
Expand All @@ -145,61 +154,67 @@ def segment_blockwise(
``Bool``.
"""
with tempfile.TemporaryDirectory(prefix=tmp_prefix) as tmpdir:
logger.info(
"Running blockwise segmentation, with segment_function_file: ",
segment_function_file,
" in temp directory: ",
tmpdir,
)
# write parameters to tmpdir
if "parameters" in kwargs:
with open(Path(tmpdir, "parameters.yaml"), "w") as f:
yaml.dump(kwargs.pop("parameters"), f)

# Make the task
task = DaCapoBlockwiseTask(
str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")),
total_roi.grow(context, context),
read_roi,
write_roi,
num_workers,
max_retries,
timeout,
upstream_tasks,
tmpdir=tmpdir,
function_path=str(segment_function_file),
*args,
**kwargs,
)
logger.info(
"Running blockwise segmentation with worker_file: ",
str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")),
)
success = daisy.run_blockwise([task])

# give a second for the fist task to finish
time.sleep(1)
read_roi = write_roi

# Make the task
task = DaCapoBlockwiseTask(
str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")),
total_roi,
read_roi,
write_roi,
num_workers,
max_retries,
timeout,
upstream_tasks,
tmpdir=tmpdir,
*args,
**kwargs,
)
logger.info(
"Running blockwise relabeling with worker_file: ",
str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")),
)

success = success and daisy.run_blockwise([task])
return success
options = Options.instance()
if not options.runs_base_dir.exists():
options.runs_base_dir.mkdir(parents=True)
tmpdir = tempfile.mkdtemp(dir=options.runs_base_dir)

print(
"Running blockwise segmentation, with segment_function_file: ",
segment_function_file,
" in temp directory: ",
tmpdir,
)
# write parameters to tmpdir
if "parameters" in kwargs:
with open(Path(tmpdir, "parameters.yaml"), "w") as f:
yaml.dump(kwargs.pop("parameters"), f)

# Make the task
task = DaCapoBlockwiseTask(
str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")),
total_roi.grow(context, context),
read_roi,
write_roi,
num_workers,
max_retries,
timeout,
upstream_tasks,
tmpdir=tmpdir,
function_path=str(segment_function_file),
*args,
**kwargs,
)
print(
"Running blockwise segmentation with worker_file: ",
str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")),
)
success = daisy.run_blockwise([task])

# give a second for the fist task to finish
time.sleep(1)
read_roi = write_roi

# Make the task
task = DaCapoBlockwiseTask(
str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")),
total_roi,
read_roi,
write_roi,
num_workers,
max_retries,
timeout,
upstream_tasks,
tmpdir=tmpdir,
*args,
**kwargs,
)
print(
"Running blockwise relabeling with worker_file: ",
str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")),
)

success = success and daisy.run_blockwise([task])

shutil.rmtree(tmpdir, ignore_errors=True)
return success
Loading

0 comments on commit b3cd009

Please sign in to comment.