From 42e7a108df5381471a0b77ce1c499fea9247a86c Mon Sep 17 00:00:00 2001 From: rhoadesScholar Date: Mon, 11 Mar 2024 10:42:44 -0400 Subject: [PATCH 1/3] =?UTF-8?q?fix:=20=E2=9A=A1=EF=B8=8F=20Simplified=20tm?= =?UTF-8?q?pdir=20management=20for=20cluster.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dacapo/blockwise/scheduler.py | 133 ++++++++++++++++------------- dacapo/blockwise/segment_worker.py | 14 +-- dacapo/cli.py | 3 - dacapo/options.py | 2 +- 4 files changed, 84 insertions(+), 68 deletions(-) diff --git a/dacapo/blockwise/scheduler.py b/dacapo/blockwise/scheduler.py index 39b9ef6b..4b4ff44e 100644 --- a/dacapo/blockwise/scheduler.py +++ b/dacapo/blockwise/scheduler.py @@ -1,4 +1,5 @@ from pathlib import Path +import shutil import tempfile import time import daisy @@ -7,6 +8,7 @@ import yaml from dacapo.blockwise import DaCapoBlockwiseTask +from dacapo import Options import logging logger = logging.getLogger(__name__) @@ -96,7 +98,6 @@ def segment_blockwise( max_retries: int = 2, timeout=None, upstream_tasks=None, - tmp_prefix="tmp", *args, **kwargs, ): @@ -133,6 +134,14 @@ def segment_blockwise( (either due to failed post check or application crashes or network failure) + timeout (``int``): + + The maximum time in seconds to wait for a worker to complete a task. + + upstream_tasks (``List``): + + List of upstream tasks. + *args: Additional positional arguments to pass to ``worker_function``. @@ -145,61 +154,67 @@ def segment_blockwise( ``Bool``. """ - with tempfile.TemporaryDirectory(prefix=tmp_prefix) as tmpdir: - logger.info( - "Running blockwise segmentation, with segment_function_file: ", - segment_function_file, - " in temp directory: ", - tmpdir, - ) - # write parameters to tmpdir - if "parameters" in kwargs: - with open(Path(tmpdir, "parameters.yaml"), "w") as f: - yaml.dump(kwargs.pop("parameters"), f) - - # Make the task - task = DaCapoBlockwiseTask( - str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")), - total_roi.grow(context, context), - read_roi, - write_roi, - num_workers, - max_retries, - timeout, - upstream_tasks, - tmpdir=tmpdir, - function_path=str(segment_function_file), - *args, - **kwargs, - ) - logger.info( - "Running blockwise segmentation with worker_file: ", - str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")), - ) - success = daisy.run_blockwise([task]) - - # give a second for the fist task to finish - time.sleep(1) - read_roi = write_roi - - # Make the task - task = DaCapoBlockwiseTask( - str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")), - total_roi, - read_roi, - write_roi, - num_workers, - max_retries, - timeout, - upstream_tasks, - tmpdir=tmpdir, - *args, - **kwargs, - ) - logger.info( - "Running blockwise relabeling with worker_file: ", - str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")), - ) - - success = success and daisy.run_blockwise([task]) - return success + options = Options.instance() + if not options.runs_base_dir.exists(): + options.runs_base_dir.mkdir(parents=True) + tmpdir = tempfile.mkdtemp(dir=options.runs_base_dir) + + logger.info( + "Running blockwise segmentation, with segment_function_file: ", + segment_function_file, + " in temp directory: ", + tmpdir, + ) + # write parameters to tmpdir + if "parameters" in kwargs: + with open(Path(tmpdir, "parameters.yaml"), "w") as f: + yaml.dump(kwargs.pop("parameters"), f) + + # Make the task + task = DaCapoBlockwiseTask( + str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")), + total_roi.grow(context, context), + read_roi, + write_roi, + num_workers, + max_retries, + timeout, + upstream_tasks, + tmpdir=tmpdir, + function_path=str(segment_function_file), + *args, + **kwargs, + ) + logger.info( + "Running blockwise segmentation with worker_file: ", + str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")), + ) + success = daisy.run_blockwise([task]) + + # give a second for the fist task to finish + time.sleep(1) + read_roi = write_roi + + # Make the task + task = DaCapoBlockwiseTask( + str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")), + total_roi, + read_roi, + write_roi, + num_workers, + max_retries, + timeout, + upstream_tasks, + tmpdir=tmpdir, + *args, + **kwargs, + ) + logger.info( + "Running blockwise relabeling with worker_file: ", + str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")), + ) + + success = success and daisy.run_blockwise([task]) + + shutil.rmtree(tmpdir, ignore_errors=True) + return success diff --git a/dacapo/blockwise/segment_worker.py b/dacapo/blockwise/segment_worker.py index fb1b6423..0d90a143 100644 --- a/dacapo/blockwise/segment_worker.py +++ b/dacapo/blockwise/segment_worker.py @@ -170,11 +170,15 @@ def start_worker( nodes = np.unique(edges) logger.info(f"Writing ids to {os.path.join(tmpdir, 'block_%d.npz')}") - np.savez_compressed( - os.path.join(tmpdir, "block_%d.npz" % block.block_id[1]), - nodes=nodes, - edges=edges, - ) + assert os.path.exists(tmpdir) + with open( + os.path.join(tmpdir, f"block_{block.block_id[1]}.npz"), "wb" + ) as f: + np.savez_compressed( + f, + nodes=nodes, + edges=edges, + ) def spawn_worker( diff --git a/dacapo/cli.py b/dacapo/cli.py index 798f5ae7..29541a96 100644 --- a/dacapo/cli.py +++ b/dacapo/cli.py @@ -329,7 +329,6 @@ def run_blockwise( @click.option("-nw", "--num_workers", type=int, default=16) @click.option("-mr", "--max_retries", type=int, default=2) @click.option("-t", "--timeout", type=int, default=None) -@click.option("-tp", "--tmp_prefix", type=str, default="tmp") @click.option("-ow", "--overwrite", is_flag=True, default=True) @click.option("-co", "--channels_out", type=int, default=None) @click.pass_context @@ -347,7 +346,6 @@ def segment_blockwise( num_workers: int = 16, max_retries: int = 2, timeout=None, - tmp_prefix: str = "tmp", overwrite: bool = True, channels_out: Optional[int] = None, *args, @@ -401,7 +399,6 @@ def segment_blockwise( num_workers=num_workers, max_retries=max_retries, timeout=timeout, - tmp_prefix=tmp_prefix, parameters=parameters, *args, **kwargs, diff --git a/dacapo/options.py b/dacapo/options.py index c54892f5..9d6055fc 100644 --- a/dacapo/options.py +++ b/dacapo/options.py @@ -24,7 +24,7 @@ class DaCapoConfig: runs_base_dir: Path = attr.ib( default=Path(expanduser("~/.dacapo")), metadata={ - "help_text": "The path at DaCapo will use for reading and writing any necessary data." + "help_text": "The path at DaCapo will use for reading and writing any necessary data. This should be an absolute path." }, ) compute_context: dict = attr.ib( From ab0a549047aa63211a406028452ed5e4294e1def Mon Sep 17 00:00:00 2001 From: rhoadesScholar Date: Mon, 11 Mar 2024 10:56:20 -0400 Subject: [PATCH 2/3] =?UTF-8?q?style:=20=E2=9A=A1=EF=B8=8F=20Update=20logg?= =?UTF-8?q?ing=20statements=20to=20use=20print=20instead=20of=20logger.inf?= =?UTF-8?q?o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dacapo/apply.py | 16 +++++++--------- dacapo/blockwise/argmax_worker.py | 2 +- dacapo/blockwise/empanada_function.py | 18 ++++++++---------- dacapo/blockwise/predict_worker.py | 6 ++---- dacapo/blockwise/scheduler.py | 8 ++++---- dacapo/blockwise/segment_worker.py | 14 ++++++-------- dacapo/blockwise/threshold_worker.py | 2 +- dacapo/cli.py | 8 ++++---- .../datasplits/datasets/arrays/concat_array.py | 2 +- dacapo/experiments/starts/start.py | 2 +- .../binary_segmentation_evaluator.py | 2 +- .../experiments/trainers/gunpowder_trainer.py | 2 +- dacapo/gp/elastic_augment_fuse.py | 12 ++++++------ dacapo/predict.py | 8 +++----- dacapo/store/file_config_store.py | 2 +- dacapo/store/file_stats_store.py | 6 +++--- dacapo/store/local_array_store.py | 2 +- dacapo/store/local_weights_store.py | 8 ++++---- dacapo/store/mongo_config_store.py | 2 +- dacapo/store/mongo_stats_store.py | 6 +++--- dacapo/train.py | 12 ++++++------ dacapo/validate.py | 14 ++++++-------- 22 files changed, 71 insertions(+), 83 deletions(-) diff --git a/dacapo/apply.py b/dacapo/apply.py index 9e6006c9..60f75460 100644 --- a/dacapo/apply.py +++ b/dacapo/apply.py @@ -59,7 +59,7 @@ def apply( ), "Either validation_dataset and criterion, or iteration must be provided." # retrieving run - logger.info("Loading run %s", run_name) + print("Loading run %s", run_name) config_store = create_config_store() run_config = config_store.retrieve_run_config(run_name) run = Run(run_config) @@ -70,7 +70,7 @@ def apply( # load weights if iteration is None: iteration = weights_store.retrieve_best(run_name, validation_dataset, criterion) # type: ignore - logger.info("Loading weights for iteration %i", iteration) + print("Loading weights for iteration %i", iteration) weights_store.retrieve_weights(run_name, iteration) if parameters is None: @@ -89,9 +89,7 @@ def apply( raise ValueError( "validation_dataset must be a dataset name or a Dataset object, or parameters must be provided explicitly." ) - logger.info( - "Finding best parameters for validation dataset %s", _validation_dataset - ) + print("Finding best parameters for validation dataset %s", _validation_dataset) parameters = run.task.evaluator.get_overall_best_parameters( _validation_dataset, criterion ) @@ -151,7 +149,7 @@ def apply( output_container, f"output_{run_name}_{iteration}_{parameters}" ) - logger.info( + print( "Applying best results from run %s at iteration %i to dataset %s", run.name, iteration, @@ -186,7 +184,7 @@ def apply_run( """Apply the model to a dataset. If roi is None, the whole input dataset is used. Assumes model is already loaded.""" # render prediction dataset - logger.info("Predicting on dataset %s", prediction_array_identifier) + print("Predicting on dataset %s", prediction_array_identifier) predict( run.name, iteration, @@ -200,10 +198,10 @@ def apply_run( ) # post-process the output - logger.info("Post-processing output to dataset %s", output_array_identifier) + print("Post-processing output to dataset %s", output_array_identifier) post_processor = run.task.post_processor post_processor.set_prediction(prediction_array_identifier) post_processor.process(parameters, output_array_identifier, num_workers=num_workers) - logger.info("Done") + print("Done") return diff --git a/dacapo/blockwise/argmax_worker.py b/dacapo/blockwise/argmax_worker.py index 77d932f3..59a17d75 100644 --- a/dacapo/blockwise/argmax_worker.py +++ b/dacapo/blockwise/argmax_worker.py @@ -61,7 +61,7 @@ def start_worker( client = daisy.Client() while True: - logger.info("getting block") + print("getting block") with client.acquire_block() as block: if block is None: break diff --git a/dacapo/blockwise/empanada_function.py b/dacapo/blockwise/empanada_function.py index 911d8dbc..4175f857 100644 --- a/dacapo/blockwise/empanada_function.py +++ b/dacapo/blockwise/empanada_function.py @@ -74,7 +74,7 @@ def orthoplane_inference(engine, volume): # report instances per class for tracker in trackers: class_id = tracker.class_id - logger.info( + print( f"Class {class_id}, axis {axis_name}, has {len(tracker.instances.keys())} instances" ) @@ -153,7 +153,7 @@ def start_postprocess_worker(*args): min_extent=min_extent, dtype=engine.dtype, ): - logger.info(f"Yielding {class_name} volume of shape {vol.shape}") + print(f"Yielding {class_name} volume of shape {vol.shape}") yield vol, class_name, tracker def start_consensus_worker(trackers_dict): @@ -166,7 +166,7 @@ def start_consensus_worker(trackers_dict): min_extent=min_extent, dtype=engine.dtype, ): - logger.info(f"Yielding {class_name} volume of shape {vol.shape}") + print(f"Yielding {class_name} volume of shape {vol.shape}") yield vol, class_name, tracker # verify that the image doesn't have extraneous channel dimensions @@ -182,7 +182,7 @@ def start_consensus_worker(trackers_dict): else: raise Exception(f"Image volume must be 3D, got image of shape {shape}") - logger.info( + print( f"Got 4D image of shape {shape}, extracted single channel of size {image.shape}" ) @@ -210,7 +210,7 @@ def stack_postprocessing( # create the final instance segmentations for class_id, class_name in class_names.items(): - logger.info(f"Creating stack segmentation for class {class_name}...") + print(f"Creating stack segmentation for class {class_name}...") class_tracker = get_axis_trackers_by_class(trackers, class_id)[0] shape3d = class_tracker.shape3d @@ -224,7 +224,7 @@ def stack_postprocessing( filters.remove_small_objects(stack_tracker, min_size=min_size) filters.remove_pancakes(stack_tracker, min_span=min_extent) - logger.info(f"Total {class_name} objects {len(stack_tracker.instances.keys())}") + print(f"Total {class_name} objects {len(stack_tracker.instances.keys())}") # decode and fill the instances stack_vol = np.zeros(shape3d, dtype=dtype) @@ -254,7 +254,7 @@ def tracker_consensus( # create the final instance segmentations for class_id, class_name in class_names.items(): # get the relevant trackers for the class_label - logger.info(f"Creating consensus segmentation for class {class_name}...") + print(f"Creating consensus segmentation for class {class_name}...") class_trackers = get_axis_trackers_by_class(trackers, class_id) shape3d = class_trackers[0].shape3d @@ -271,9 +271,7 @@ def tracker_consensus( class_trackers, pixel_vote_thr ) - logger.info( - f"Total {class_name} objects {len(consensus_tracker.instances.keys())}" - ) + print(f"Total {class_name} objects {len(consensus_tracker.instances.keys())}") # decode and fill the instances consensus_vol = np.zeros(shape3d, dtype=dtype) diff --git a/dacapo/blockwise/predict_worker.py b/dacapo/blockwise/predict_worker.py index 462d1736..68a2eec1 100644 --- a/dacapo/blockwise/predict_worker.py +++ b/dacapo/blockwise/predict_worker.py @@ -101,9 +101,7 @@ def start_worker( input_size = input_voxel_size * input_shape output_size = output_voxel_size * model.compute_output_shape(input_shape)[1] - logger.info( - "Predicting with input size %s, output size %s", input_size, output_size - ) + print("Predicting with input size %s, output size %s", input_size, output_size) # create gunpowder keys @@ -181,7 +179,7 @@ def start_worker( if block is None: return - logger.info("Processing block %s", block) + print("Processing block %s", block) chunk_request = request.copy() chunk_request[raw].roi = block.read_roi diff --git a/dacapo/blockwise/scheduler.py b/dacapo/blockwise/scheduler.py index 4b4ff44e..89bf0ba4 100644 --- a/dacapo/blockwise/scheduler.py +++ b/dacapo/blockwise/scheduler.py @@ -83,7 +83,7 @@ def run_blockwise( **kwargs, ) - logger.info("Running blockwise with worker_file: ", worker_file) + print("Running blockwise with worker_file: ", worker_file) success = daisy.run_blockwise([task]) return success @@ -159,7 +159,7 @@ def segment_blockwise( options.runs_base_dir.mkdir(parents=True) tmpdir = tempfile.mkdtemp(dir=options.runs_base_dir) - logger.info( + print( "Running blockwise segmentation, with segment_function_file: ", segment_function_file, " in temp directory: ", @@ -185,7 +185,7 @@ def segment_blockwise( *args, **kwargs, ) - logger.info( + print( "Running blockwise segmentation with worker_file: ", str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")), ) @@ -209,7 +209,7 @@ def segment_blockwise( *args, **kwargs, ) - logger.info( + print( "Running blockwise relabeling with worker_file: ", str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")), ) diff --git a/dacapo/blockwise/segment_worker.py b/dacapo/blockwise/segment_worker.py index 0d90a143..1d01ec0c 100644 --- a/dacapo/blockwise/segment_worker.py +++ b/dacapo/blockwise/segment_worker.py @@ -60,21 +60,21 @@ def start_worker( function_path (str): The path to the segment function. """ - logger.info("Starting worker") + print("Starting worker") # get arrays input_array_identifier = LocalArrayIdentifier(Path(input_container), input_dataset) - logger.info(f"Opening input array {input_array_identifier}") + print(f"Opening input array {input_array_identifier}") input_array = ZarrArray.open_from_array_identifier(input_array_identifier) output_array_identifier = LocalArrayIdentifier( Path(output_container), output_dataset ) - logger.info(f"Opening output array {output_array_identifier}") + print(f"Opening output array {output_array_identifier}") output_array = ZarrArray.open_from_array_identifier(output_array_identifier) # Load segment function function_name = Path(function_path).stem - logger.info(f"Loading segment function from {str(function_path)}") + print(f"Loading segment function from {str(function_path)}") function = SourceFileLoader(function_name, str(function_path)).load_module() segment_function = function.segment_function @@ -86,9 +86,7 @@ def start_worker( # load parameters saved in tmpdir if os.path.exists(os.path.join(tmpdir, "parameters.yaml")): - logger.info( - f"Loading parameters from {os.path.join(tmpdir, 'parameters.yaml')}" - ) + print(f"Loading parameters from {os.path.join(tmpdir, 'parameters.yaml')}") with open(os.path.join(tmpdir, "parameters.yaml"), "r") as f: parameters.update(yaml.safe_load(f)) @@ -169,7 +167,7 @@ def start_worker( edges = unique_pairs[non_zero_filter] nodes = np.unique(edges) - logger.info(f"Writing ids to {os.path.join(tmpdir, 'block_%d.npz')}") + print(f"Writing ids to {os.path.join(tmpdir, 'block_%d.npz')}") assert os.path.exists(tmpdir) with open( os.path.join(tmpdir, f"block_{block.block_id[1]}.npz"), "wb" diff --git a/dacapo/blockwise/threshold_worker.py b/dacapo/blockwise/threshold_worker.py index a9ab85d0..3ff08c1e 100644 --- a/dacapo/blockwise/threshold_worker.py +++ b/dacapo/blockwise/threshold_worker.py @@ -63,7 +63,7 @@ def start_worker( client = daisy.Client() while True: - logger.info("getting block") + print("getting block") with client.acquire_block() as block: if block is None: break diff --git a/dacapo/cli.py b/dacapo/cli.py index 29541a96..22cd453a 100644 --- a/dacapo/cli.py +++ b/dacapo/cli.py @@ -384,7 +384,7 @@ def segment_blockwise( np.uint64, overwrite=overwrite, ) - logger.info( + print( f"Created output array {output_array_identifier.container}:{output_array_identifier.dataset} with ROI {_total_roi}." ) @@ -406,7 +406,7 @@ def segment_blockwise( def unpack_ctx(ctx): - # logger.info(ctx.args) + # print(ctx.args) kwargs = { ctx.args[i].lstrip("-"): ctx.args[i + 1] for i in range(0, len(ctx.args), 2) } @@ -415,8 +415,8 @@ def unpack_ctx(ctx): kwargs[k] = int(v) elif v.replace(".", "").isnumeric(): kwargs[k] = float(v) - logger.info(f"{k}: {kwargs[k]}") - # logger.info(f"{type(k)}: {k} --> {type(kwargs[k])} {kwargs[k]}") + print(f"{k}: {kwargs[k]}") + # print(f"{type(k)}: {k} --> {type(kwargs[k])} {kwargs[k]}") return kwargs diff --git a/dacapo/experiments/datasplits/datasets/arrays/concat_array.py b/dacapo/experiments/datasplits/datasets/arrays/concat_array.py index 7fd91e08..37cf650f 100644 --- a/dacapo/experiments/datasplits/datasets/arrays/concat_array.py +++ b/dacapo/experiments/datasplits/datasets/arrays/concat_array.py @@ -121,7 +121,7 @@ def __getitem__(self, roi: Roi) -> np.ndarray: axis=0, ) if concatenated.shape[0] == 1: - logger.info( + print( f"Concatenated array has only one channel: {self.name} {concatenated.shape}" ) return concatenated diff --git a/dacapo/experiments/starts/start.py b/dacapo/experiments/starts/start.py index da7badbf..4287eb6c 100644 --- a/dacapo/experiments/starts/start.py +++ b/dacapo/experiments/starts/start.py @@ -14,7 +14,7 @@ def initialize_weights(self, model): weights_store = create_weights_store() weights = weights_store._retrieve_weights(self.run, self.criterion) - logger.info(f"loading weights from run {self.run}, criterion: {self.criterion}") + print(f"loading weights from run {self.run}, criterion: {self.criterion}") # load the model weights (taken from torch load_state_dict source) try: model.load_state_dict(weights.model) diff --git a/dacapo/experiments/tasks/evaluators/binary_segmentation_evaluator.py b/dacapo/experiments/tasks/evaluators/binary_segmentation_evaluator.py index fafea82a..bf5d6d98 100644 --- a/dacapo/experiments/tasks/evaluators/binary_segmentation_evaluator.py +++ b/dacapo/experiments/tasks/evaluators/binary_segmentation_evaluator.py @@ -41,7 +41,7 @@ def evaluate(self, output_array_identifier, evaluation_array): output_array = ZarrArray.open_from_array_identifier(output_array_identifier) evaluation_data = evaluation_array[evaluation_array.roi] output_data = output_array[output_array.roi] - logger.info( + print( f"Evaluating binary segmentations on evaluation_data of shape: {evaluation_data.shape}" ) assert ( diff --git a/dacapo/experiments/trainers/gunpowder_trainer.py b/dacapo/experiments/trainers/gunpowder_trainer.py index 46379acf..57166beb 100644 --- a/dacapo/experiments/trainers/gunpowder_trainer.py +++ b/dacapo/experiments/trainers/gunpowder_trainer.py @@ -196,7 +196,7 @@ def build_batch_provider(self, datasets, model, task, snapshot_container=None): def iterate(self, num_iterations, model, optimizer, device): t_start_fetch = time.time() - logger.info("Starting iteration!") + print("Starting iteration!") for iteration in range(self.iteration, self.iteration + num_iterations): raw, gt, target, weight, mask = self.next() diff --git a/dacapo/gp/elastic_augment_fuse.py b/dacapo/gp/elastic_augment_fuse.py index 3de3ed33..b070d20a 100644 --- a/dacapo/gp/elastic_augment_fuse.py +++ b/dacapo/gp/elastic_augment_fuse.py @@ -82,9 +82,9 @@ def _create_rotation_transformation(shape, angle, subsample=1, voxel_size=None): # rotate control points center = np.array([0.5 * (d - 1) * vs for d, vs in zip(shape, voxel_size)]) - # logger.info("Creating rotation transformation with:") - # logger.info("\tangle : " + str(angle)) - # logger.info("\tcenter: " + str(center)) + # print("Creating rotation transformation with:") + # print("\tangle : " + str(angle)) + # print("\tcenter: " + str(center)) control_point_offsets = np.zeros((dims,) + control_points, dtype=np.float32) for control_point in np.ndindex(control_points): @@ -116,9 +116,9 @@ def _create_uniform_3d_transformation(shape, rotation, subsample=1, voxel_size=N # rotate control points center = np.array([0.5 * (d - 1) * vs for d, vs in zip(shape, voxel_size)]) - # logger.info("Creating rotation transformation with:") - # logger.info("\tangle : " + str(angle)) - # logger.info("\tcenter: " + str(center)) + # print("Creating rotation transformation with:") + # print("\tangle : " + str(angle)) + # print("\tcenter: " + str(center)) control_point_offsets = np.zeros((dims,) + control_points, dtype=np.float32) for control_point in np.ndindex(control_points): diff --git a/dacapo/predict.py b/dacapo/predict.py index 1ea363ea..4a5fa9eb 100644 --- a/dacapo/predict.py +++ b/dacapo/predict.py @@ -106,11 +106,9 @@ def predict( if isinstance(output_dtype, str): output_dtype = np.dtype(output_dtype) - logger.info( - "Predicting with input size %s, output size %s", input_size, output_size - ) + print("Predicting with input size %s, output size %s", input_size, output_size) - logger.info("Total input ROI: %s, output ROI: %s", _input_roi, output_roi) + print("Total input ROI: %s, output ROI: %s", _input_roi, output_roi) # prepare prediction dataset axes = ["c"] + [axis for axis in raw_array.axes if axis != "c"] @@ -126,7 +124,7 @@ def predict( # run blockwise prediction worker_file = str(Path(Path(dacapo.blockwise.__file__).parent, "predict_worker.py")) - logger.info("Running blockwise prediction with worker_file: ", worker_file) + print("Running blockwise prediction with worker_file: ", worker_file) run_blockwise( worker_file=worker_file, total_roi=_input_roi, diff --git a/dacapo/store/file_config_store.py b/dacapo/store/file_config_store.py index ae88ebdd..aaad9b7f 100644 --- a/dacapo/store/file_config_store.py +++ b/dacapo/store/file_config_store.py @@ -20,7 +20,7 @@ class FileConfigStore(ConfigStore): """ def __init__(self, path): - logger.info("Creating FileConfigStore:\n\tpath : %s", path) + print("Creating FileConfigStore:\n\tpath : %s", path) self.path = Path(path) diff --git a/dacapo/store/file_stats_store.py b/dacapo/store/file_stats_store.py index b3ce77f3..d367e842 100644 --- a/dacapo/store/file_stats_store.py +++ b/dacapo/store/file_stats_store.py @@ -17,7 +17,7 @@ class FileStatsStore(StatsStore): """ def __init__(self, path): - logger.info("Creating MongoStatsStore:\n\tpath : %s", path) + print("Creating MongoStatsStore:\n\tpath : %s", path) self.path = Path(path) @@ -35,7 +35,7 @@ def store_training_stats(self, run_name, stats): if stats.trained_until() > existing_stats.trained_until(): # current stats go further than the one in DB store_from_iteration = existing_stats.trained_until() - logger.info( + print( "Updating training stats of run %s after iteration %d", run_name, store_from_iteration, @@ -65,7 +65,7 @@ def store_validation_iteration_scores(self, run_name, scores): self.__delete_validation_iteration_scores(run_name) if store_from_iteration > 0: - logger.info( + print( "Updating validation scores of run %s after iteration " "%d", run_name, store_from_iteration, diff --git a/dacapo/store/local_array_store.py b/dacapo/store/local_array_store.py index 73994d98..1c6e80c5 100644 --- a/dacapo/store/local_array_store.py +++ b/dacapo/store/local_array_store.py @@ -113,7 +113,7 @@ def remove(self, array_identifier: "LocalArrayIdentifier") -> None: ) return - logger.info("Removing dataset %s in container %s", dataset, container) + print("Removing dataset %s in container %s", dataset, container) shutil.rmtree(path) def __get_run_dir(self, run_name: str) -> Path: diff --git a/dacapo/store/local_weights_store.py b/dacapo/store/local_weights_store.py index fe72eb05..c95aca9a 100644 --- a/dacapo/store/local_weights_store.py +++ b/dacapo/store/local_weights_store.py @@ -17,7 +17,7 @@ class LocalWeightsStore(WeightsStore): """A local store for network weights.""" def __init__(self, basedir): - logger.info("Creating local weights store in directory %s", basedir) + print("Creating local weights store in directory %s", basedir) self.basedir = basedir @@ -52,7 +52,7 @@ def store_weights(self, run: Run, iteration: int): def retrieve_weights(self, run: str, iteration: int) -> Weights: """Retrieve the network weights of the given run.""" - logger.info("Retrieving weights for run %s, iteration %d", run, iteration) + print("Retrieving weights for run %s, iteration %d", run, iteration) weights_name = self.__get_weights_dir(run) / "iterations" / str(iteration) @@ -107,7 +107,7 @@ def store_best(self, run: str, iteration: int, dataset: str, criterion: str): f.write(json.dumps({"iteration": iteration})) def retrieve_best(self, run: str, dataset: str | Dataset, criterion: str) -> int: - logger.info("Retrieving weights for run %s, criterion %s", run, criterion) + print("Retrieving weights for run %s, criterion %s", run, criterion) with (self.__get_weights_dir(run) / criterion / f"{dataset}.json").open( "r" @@ -117,7 +117,7 @@ def retrieve_best(self, run: str, dataset: str | Dataset, criterion: str) -> int return weights_info["iteration"] def _load_best(self, run: Run, criterion: str): - logger.info("Retrieving weights for run %s, criterion %s", run, criterion) + print("Retrieving weights for run %s, criterion %s", run, criterion) weights_name = self.__get_weights_dir(run) / f"{criterion}" diff --git a/dacapo/store/mongo_config_store.py b/dacapo/store/mongo_config_store.py index 5739dac5..f89ee94d 100644 --- a/dacapo/store/mongo_config_store.py +++ b/dacapo/store/mongo_config_store.py @@ -22,7 +22,7 @@ class MongoConfigStore(ConfigStore): """ def __init__(self, db_host, db_name): - logger.info( + print( "Creating MongoConfigStore:\n\thost : %s\n\tdatabase: %s", db_host, db_name, diff --git a/dacapo/store/mongo_stats_store.py b/dacapo/store/mongo_stats_store.py index d0398caf..06cc832e 100644 --- a/dacapo/store/mongo_stats_store.py +++ b/dacapo/store/mongo_stats_store.py @@ -16,7 +16,7 @@ class MongoStatsStore(StatsStore): """ def __init__(self, db_host, db_name): - logger.info( + print( "Creating MongoStatsStore:\n\thost : %s\n\tdatabase: %s", db_host, db_name, @@ -41,7 +41,7 @@ def store_training_stats(self, run_name: str, stats: TrainingStats): if stats.trained_until() > existing_stats.trained_until(): # current stats go further than the one in DB store_from_iteration = existing_stats.trained_until() - logger.info( + print( "Updating training stats of run %s after iteration %d", run_name, store_from_iteration, @@ -76,7 +76,7 @@ def store_validation_iteration_scores( self.__delete_validation_scores(run_name) if store_from_iteration > 0: - logger.info( + print( "Updating validation scores of run %s after iteration " "%d", run_name, store_from_iteration, diff --git a/dacapo/train.py b/dacapo/train.py index 9b43d26d..6a0d00d5 100644 --- a/dacapo/train.py +++ b/dacapo/train.py @@ -27,7 +27,7 @@ def train(run_name: str): # # we are done here. # return - logger.info("Training run %s", run_name) + print("Training run %s", run_name) # create run @@ -39,7 +39,7 @@ def train(run_name: str): def train_run(run: Run): - logger.info("Starting/resuming training for run %s...", run) + print("Starting/resuming training for run %s...", run) # create run @@ -52,13 +52,13 @@ def train_run(run: Run): trained_until = run.training_stats.trained_until() validated_until = run.validation_scores.validated_until() if validated_until > trained_until: - logger.info( + print( f"Trained until {trained_until}, but validated until {validated_until}! " "Deleting extra validation stats" ) run.validation_scores.delete_after(trained_until) - logger.info("Current state: trained until %d/%d", trained_until, run.train_until) + print("Current state: trained until %d/%d", trained_until, run.train_until) # read weights of the latest iteration @@ -95,7 +95,7 @@ def train_run(run: Run): weights_store.retrieve_weights(run, iteration=trained_until) elif latest_weights_iteration == trained_until: - logger.info("Resuming training from iteration %d", trained_until) + print("Resuming training from iteration %d", trained_until) weights_store.retrieve_weights(run, iteration=trained_until) @@ -204,4 +204,4 @@ def train_run(run: Run): run.move_optimizer(compute_context.device) run.model.train() - logger.info("Trained until %d, finished.", trained_until) + print("Trained until %d, finished.", trained_until) diff --git a/dacapo/validate.py b/dacapo/validate.py index 65f5b040..e1d6065a 100644 --- a/dacapo/validate.py +++ b/dacapo/validate.py @@ -27,7 +27,7 @@ def validate( stored checkpoint. Returns the best parameters and scores for this iteration.""" - logger.info("Validating run %s at iteration %d...", run_name, iteration) + print("Validating run %s at iteration %d...", run_name, iteration) # create run @@ -78,7 +78,7 @@ def validate_run( or len(run.datasplit.validate) == 0 or run.datasplit.validate[0].gt is None ): - logger.info("Cannot validate run %s. Continuing training!", run.name) + print("Cannot validate run %s. Continuing training!", run.name) return None, None # get array and weight store @@ -100,9 +100,7 @@ def validate_run( ) raise NotImplementedError - logger.info( - "Validating run %s on dataset %s", run.name, validation_dataset.name - ) + print("Validating run %s on dataset %s", run.name, validation_dataset.name) ( input_raw_array_identifier, @@ -116,7 +114,7 @@ def validate_run( f"{input_gt_array_identifier.container}/{input_gt_array_identifier.dataset}" ).exists() ): - logger.info("Copying validation inputs!") + print("Copying validation inputs!") input_voxel_size = validation_dataset.raw.voxel_size output_voxel_size = run.model.scale(input_voxel_size) input_shape = run.model.eval_input_shape @@ -154,7 +152,7 @@ def validate_run( ) input_gt[output_roi] = validation_dataset.gt[output_roi] else: - logger.info("validation inputs already copied!") + print("validation inputs already copied!") prediction_array_identifier = array_store.validation_prediction_array( run.name, iteration, validation_dataset.name @@ -171,7 +169,7 @@ def validate_run( overwrite=overwrite, ) - logger.info("Predicted on dataset %s", validation_dataset.name) + print("Predicted on dataset %s", validation_dataset.name) post_processor.set_prediction(prediction_array_identifier) From db7d8aa0950939f90c666d55304edf62f958f42b Mon Sep 17 00:00:00 2001 From: rhoadesScholar Date: Mon, 11 Mar 2024 15:53:12 -0400 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20=E2=9A=A1=EF=B8=8F=20Fix=20logging,?= =?UTF-8?q?=20block=20alignment,=20tmpdirs.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dacapo/blockwise/predict_worker.py | 6 ------ dacapo/blockwise/relabel_worker.py | 10 +++++++++- dacapo/blockwise/segment_worker.py | 7 +++---- dacapo/cli.py | 2 ++ dacapo/compute_context/bsub.py | 15 +++++++++++---- 5 files changed, 25 insertions(+), 15 deletions(-) diff --git a/dacapo/blockwise/predict_worker.py b/dacapo/blockwise/predict_worker.py index 68a2eec1..e1b49b0c 100644 --- a/dacapo/blockwise/predict_worker.py +++ b/dacapo/blockwise/predict_worker.py @@ -165,12 +165,6 @@ def start_worker( output_size, voxel_size=output_voxel_size, ) - # # use daisy requests to run pipeline - # pipeline += gp.DaisyRequestBlocks( - # reference=request, - # roi_map={raw: "read_roi", prediction: "write_roi"}, - # num_workers=1, - # ) daisy_client = daisy.Client() diff --git a/dacapo/blockwise/relabel_worker.py b/dacapo/blockwise/relabel_worker.py index 654be6bd..b374f712 100644 --- a/dacapo/blockwise/relabel_worker.py +++ b/dacapo/blockwise/relabel_worker.py @@ -1,6 +1,7 @@ from glob import glob import os import sys +from time import sleep import daisy from dacapo.compute_context import create_compute_context from dacapo.store.array_store import LocalArrayIdentifier @@ -54,7 +55,14 @@ def start_worker( if block is None: break - relabel_in_block(array_out, nodes, components, block) + try: + relabel_in_block(array_out, nodes, components, block) + except OSError as e: + logging.error( + f"Failed to relabel block {block.write_roi}: {e}. Trying again." + ) + sleep(1) + relabel_in_block(array_out, nodes, components, block) def relabel_in_block(array_out, old_values, new_values, block): diff --git a/dacapo/blockwise/segment_worker.py b/dacapo/blockwise/segment_worker.py index 1d01ec0c..da1e0c09 100644 --- a/dacapo/blockwise/segment_worker.py +++ b/dacapo/blockwise/segment_worker.py @@ -167,11 +167,10 @@ def start_worker( edges = unique_pairs[non_zero_filter] nodes = np.unique(edges) - print(f"Writing ids to {os.path.join(tmpdir, 'block_%d.npz')}") assert os.path.exists(tmpdir) - with open( - os.path.join(tmpdir, f"block_{block.block_id[1]}.npz"), "wb" - ) as f: + path = os.path.join(tmpdir, f"block_{block.block_id[1]}.npz") + print(f"Writing ids to {path}") + with open(path, "wb") as f: np.savez_compressed( f, nodes=nodes, diff --git a/dacapo/cli.py b/dacapo/cli.py index 22cd453a..4136c460 100644 --- a/dacapo/cli.py +++ b/dacapo/cli.py @@ -262,6 +262,7 @@ def run_blockwise( input_array.voxel_size, output_dtype, overwrite=overwrite, + write_size=write_roi.shape, ) _run_blockwise( # type: ignore @@ -383,6 +384,7 @@ def segment_blockwise( input_array.voxel_size, np.uint64, overwrite=overwrite, + write_size=write_roi.shape, ) print( f"Created output array {output_array_identifier.container}:{output_array_identifier.dataset} with ROI {_total_roi}." diff --git a/dacapo/compute_context/bsub.py b/dacapo/compute_context/bsub.py index f89b0805..2c10da73 100644 --- a/dacapo/compute_context/bsub.py +++ b/dacapo/compute_context/bsub.py @@ -1,4 +1,7 @@ +import os +from pathlib import Path from .compute_context import ComputeContext +import daisy import attr @@ -36,6 +39,10 @@ def device(self): return "cpu" def _wrap_command(self, command): + client = daisy.Client() + basename = str( + Path("./daisy_logs", client.task_id, f"worker_{client.worker_id}") + ) return ( [ "bsub", @@ -47,10 +54,10 @@ def _wrap_command(self, command): f"num={self.num_gpus}", "-J", "dacapo", - # "-o", - # f"{run_name}_train.out", - # "-e", - # f"{run_name}_train.err", + "-o", + f"{basename}.out", + "-e", + f"{basename}.err", ] + ( [