diff --git a/mirar/monitor/base_monitor.py b/mirar/monitor/base_monitor.py index 2f106449e..ba8500eee 100644 --- a/mirar/monitor/base_monitor.py +++ b/mirar/monitor/base_monitor.py @@ -20,7 +20,7 @@ from watchdog.observers import Observer from mirar.data import Dataset, Image, ImageBatch -from mirar.errors import ErrorReport, ErrorStack, ImageNotFoundError +from mirar.errors import ErrorReport, ErrorStack, ImageNotFoundError, ProcessorError from mirar.io import check_file_is_complete from mirar.paths import ( DITHER_N_KEY, @@ -49,6 +49,10 @@ logger = logging.getLogger(__name__) +class ImageTimeoutError(ProcessorError): + """Timeout for downloading an image has been exceeded.""" + + class NewImageHandler(FileSystemEventHandler): """Class to watch a directory, and add newly-created files to a queue.""" @@ -439,17 +443,36 @@ def process_load_queue(self, queue: Queue): # Verify that file transfer is complete, useful for rsync latency transfer_done = False + t_start = Time.now() while not transfer_done: transfer_done = check_file_is_complete(event.src_path) if not transfer_done: print( - "Seems like the file is not fully transferred. " + f"Seems like the file {event.src_path} is not " + f"fully transferred. " "Waiting a couple of seconds before trying again." ) time.sleep(3) + # If a corrupt image comes in, give up eventually + if Time.now() - t_start > 60: + err = ( + f"File {event.src_path} has not been fully " + f"transferred after 60 seconds. Skipping this file." + ) + logger.error(err) + exc = ImageTimeoutError(err) + err_report = ErrorReport( + exc, "monitor", contents=[event.src_path] + ) + self.errorstack.add_report(err_report) + self.update_error_log() + + self.failed_images.append(event.src_path) + break + try: # Start processing img_batch = self.pipeline.load_raw_image(event.src_path)