diff --git a/CHANGELOG.md b/CHANGELOG.md index a99aa9e30c..6a5c0c083b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,12 +1,16 @@ **Note**: Numbers like (\#1234) point to closed Pull Requests on the fractal-server repository. -# 2.3.8 (Unreleased) +# 2.3.8 > NOTE: `FRACTAL_API_V1_MODE="include_without_submission"` is now transformed > into `FRACTAL_API_V1_MODE="include_read_only"`. -* API +* API: * Support read-only mode for V1 (\#1701). + * Improve handling of zipped job-folder in download-logs endpoints (\#1702). +* Runner: + * Improve database-error handling in V2 job execution (\#1702). + * Zip job folder after job execution (\#1702). * App: * `UvicornWorker` is now imported from `uvicorn-worker` (\#1690). * Testing: diff --git a/fractal_server/app/routes/admin/v1.py b/fractal_server/app/routes/admin/v1.py index 163c255b67..68b28dea7c 100644 --- a/fractal_server/app/routes/admin/v1.py +++ b/fractal_server/app/routes/admin/v1.py @@ -18,6 +18,7 @@ from ....config import get_settings from ....syringe import Inject from ....utils import get_timestamp +from ....zip_tools import _zip_folder_to_byte_stream_iterator from ...db import AsyncSession from ...db import get_async_db from ...models.security import UserOAuth as User @@ -34,7 +35,6 @@ from ...schemas.v1 import WorkflowReadV1 from ...security import current_active_superuser from ..aux._job import _write_shutdown_file -from ..aux._job import _zip_folder_to_byte_stream from ..aux._runner import _check_shutdown_is_supported router_admin_v1 = APIRouter() @@ -387,9 +387,8 @@ async def download_job_logs( # Create and return byte stream for zipped log folder PREFIX_ZIP = Path(job.working_dir).name zip_filename = f"{PREFIX_ZIP}_archive.zip" - byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir) return StreamingResponse( - iter([byte_stream.getvalue()]), + _zip_folder_to_byte_stream_iterator(folder=job.working_dir), media_type="application/x-zip-compressed", headers={"Content-Disposition": f"attachment;filename={zip_filename}"}, ) diff --git a/fractal_server/app/routes/admin/v2.py b/fractal_server/app/routes/admin/v2.py index 50c0d9a057..f86ae08244 100644 --- a/fractal_server/app/routes/admin/v2.py +++ b/fractal_server/app/routes/admin/v2.py @@ -21,6 +21,7 @@ from ....config import get_settings from ....syringe import Inject from ....utils import get_timestamp +from ....zip_tools import _zip_folder_to_byte_stream_iterator from ...db import AsyncSession from ...db import get_async_db from ...models.security import UserOAuth as User @@ -37,7 +38,6 @@ from ...schemas.v2 import ProjectReadV2 from ...security import current_active_superuser from ..aux._job import _write_shutdown_file -from ..aux._job import _zip_folder_to_byte_stream from ..aux._runner import _check_shutdown_is_supported router_admin_v2 = APIRouter() @@ -274,9 +274,8 @@ async def download_job_logs( # Create and return byte stream for zipped log folder PREFIX_ZIP = Path(job.working_dir).name zip_filename = f"{PREFIX_ZIP}_archive.zip" - byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir) return StreamingResponse( - iter([byte_stream.getvalue()]), + _zip_folder_to_byte_stream_iterator(folder=job.working_dir), media_type="application/x-zip-compressed", headers={"Content-Disposition": f"attachment;filename={zip_filename}"}, ) diff --git a/fractal_server/app/routes/api/v1/job.py b/fractal_server/app/routes/api/v1/job.py index 55f79ba400..be785f73a0 100644 --- a/fractal_server/app/routes/api/v1/job.py +++ b/fractal_server/app/routes/api/v1/job.py @@ -8,6 +8,7 @@ from fastapi.responses import StreamingResponse from sqlmodel import select +from .....zip_tools import _zip_folder_to_byte_stream_iterator from ....db import AsyncSession from ....db import get_async_db from ....models.v1 import ApplyWorkflow @@ -18,7 +19,6 @@ from ....security import current_active_user from ....security import User from ...aux._job import _write_shutdown_file -from ...aux._job import _zip_folder_to_byte_stream from ...aux._runner import _check_shutdown_is_supported from ._aux_functions import _get_job_check_owner from ._aux_functions import _get_project_check_owner @@ -128,9 +128,8 @@ async def download_job_logs( # Create and return byte stream for zipped log folder PREFIX_ZIP = Path(job.working_dir).name zip_filename = f"{PREFIX_ZIP}_archive.zip" - byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir) return StreamingResponse( - iter([byte_stream.getvalue()]), + _zip_folder_to_byte_stream_iterator(folder=job.working_dir), media_type="application/x-zip-compressed", headers={"Content-Disposition": f"attachment;filename={zip_filename}"}, ) diff --git a/fractal_server/app/routes/api/v2/job.py b/fractal_server/app/routes/api/v2/job.py index 02da7354bd..a5980fba5b 100644 --- a/fractal_server/app/routes/api/v2/job.py +++ b/fractal_server/app/routes/api/v2/job.py @@ -8,6 +8,7 @@ from fastapi.responses import StreamingResponse from sqlmodel import select +from .....zip_tools import _zip_folder_to_byte_stream_iterator from ....db import AsyncSession from ....db import get_async_db from ....models.v2 import JobV2 @@ -18,7 +19,6 @@ from ....security import current_active_user from ....security import User from ...aux._job import _write_shutdown_file -from ...aux._job import _zip_folder_to_byte_stream from ...aux._runner import _check_shutdown_is_supported from ._aux_functions import _get_job_check_owner from ._aux_functions import _get_project_check_owner @@ -118,7 +118,7 @@ async def download_job_logs( db: AsyncSession = Depends(get_async_db), ) -> StreamingResponse: """ - Download job folder + Download zipped job folder """ output = await _get_job_check_owner( project_id=project_id, @@ -127,15 +127,11 @@ async def download_job_logs( db=db, ) job = output["job"] - - # Create and return byte stream for zipped log folder - PREFIX_ZIP = Path(job.working_dir).name - zip_filename = f"{PREFIX_ZIP}_archive.zip" - byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir) + zip_name = f"{Path(job.working_dir).name}_archive.zip" return StreamingResponse( - iter([byte_stream.getvalue()]), + _zip_folder_to_byte_stream_iterator(folder=job.working_dir), media_type="application/x-zip-compressed", - headers={"Content-Disposition": f"attachment;filename={zip_filename}"}, + headers={"Content-Disposition": f"attachment;filename={zip_name}"}, ) diff --git a/fractal_server/app/routes/aux/_job.py b/fractal_server/app/routes/aux/_job.py index 31188f0f4a..e15eb81745 100644 --- a/fractal_server/app/routes/aux/_job.py +++ b/fractal_server/app/routes/aux/_job.py @@ -1,9 +1,5 @@ -import os -from io import BytesIO from pathlib import Path from typing import Union -from zipfile import ZIP_DEFLATED -from zipfile import ZipFile from ...models.v1 import ApplyWorkflow from ...models.v2 import JobV2 @@ -24,22 +20,3 @@ def _write_shutdown_file(*, job: Union[ApplyWorkflow, JobV2]): shutdown_file = Path(job.working_dir) / SHUTDOWN_FILENAME with shutdown_file.open("w") as f: f.write(f"Trigger executor shutdown for {job.id=}.") - - -def _zip_folder_to_byte_stream(*, folder: str) -> BytesIO: - """ - Get byte stream with the zipped log folder of a job. - - Args: - folder: the folder to zip - """ - - byte_stream = BytesIO() - with ZipFile(byte_stream, mode="w", compression=ZIP_DEFLATED) as zipfile: - for root, dirs, files in os.walk(folder): - for file in files: - file_path = os.path.join(root, file) - archive_path = os.path.relpath(file_path, folder) - zipfile.write(file_path, archive_path) - - return byte_stream diff --git a/fractal_server/app/runner/v2/__init__.py b/fractal_server/app/runner/v2/__init__.py index 45644aae5c..eae855a90b 100644 --- a/fractal_server/app/runner/v2/__init__.py +++ b/fractal_server/app/runner/v2/__init__.py @@ -5,7 +5,6 @@ Other subystems should only import this module and not its submodules or the individual backends. """ -import logging import os import traceback from pathlib import Path @@ -21,6 +20,7 @@ from ....ssh._fabric import FractalSSH from ....syringe import Inject from ....utils import get_timestamp +from ....zip_tools import _zip_folder_to_file_and_remove from ...db import DB from ...models.v2 import DatasetV2 from ...models.v2 import JobV2 @@ -114,9 +114,34 @@ async def submit_workflow( with next(DB.get_sync_db()) as db_sync: - job: JobV2 = db_sync.get(JobV2, job_id) - if not job: + try: + job: Optional[JobV2] = db_sync.get(JobV2, job_id) + dataset: Optional[DatasetV2] = db_sync.get(DatasetV2, dataset_id) + workflow: Optional[WorkflowV2] = db_sync.get( + WorkflowV2, workflow_id + ) + except Exception as e: + logger.error( + f"Error conneting to the database. Original error: {str(e)}" + ) + reset_logger_handlers(logger) + return + + if job is None: logger.error(f"JobV2 {job_id} does not exist") + reset_logger_handlers(logger) + return + if dataset is None or workflow is None: + log_msg = "" + if not dataset: + log_msg += f"Cannot fetch dataset {dataset_id} from database\n" + if not workflow: + log_msg += ( + f"Cannot fetch workflow {workflow_id} from database\n" + ) + fail_job( + db=db_sync, job=job, log_msg=log_msg, logger_name=logger_name + ) return # Declare runner backend and set `process_workflow` function @@ -137,21 +162,6 @@ async def submit_workflow( ) return - dataset: DatasetV2 = db_sync.get(DatasetV2, dataset_id) - workflow: WorkflowV2 = db_sync.get(WorkflowV2, workflow_id) - if not (dataset and workflow): - log_msg = "" - if not dataset: - log_msg += f"Cannot fetch dataset {dataset_id} from database\n" - if not workflow: - log_msg += ( - f"Cannot fetch workflow {workflow_id} from database\n" - ) - fail_job( - db=db_sync, job=job, log_msg=log_msg, logger_name=logger_name - ) - return - # Define and create server-side working folder WORKFLOW_DIR_LOCAL = Path(job.working_dir) if WORKFLOW_DIR_LOCAL.exists(): @@ -192,9 +202,9 @@ async def submit_workflow( fractal_ssh.mkdir( folder=str(WORKFLOW_DIR_REMOTE), ) - logging.info(f"Created {str(WORKFLOW_DIR_REMOTE)} via SSH.") + logger.info(f"Created {str(WORKFLOW_DIR_REMOTE)} via SSH.") else: - logging.error( + logger.error( "Invalid FRACTAL_RUNNER_BACKEND=" f"{settings.FRACTAL_RUNNER_BACKEND}." ) @@ -219,7 +229,7 @@ async def submit_workflow( user=slurm_user, ) else: - logging.info("Skip remote-subfolder creation") + logger.info("Skip remote-subfolder creation") except Exception as e: error_type = type(e).__name__ fail_job( @@ -448,3 +458,4 @@ async def submit_workflow( finally: reset_logger_handlers(logger) db_sync.close() + _zip_folder_to_file_and_remove(folder=job.working_dir) diff --git a/fractal_server/zip_tools.py b/fractal_server/zip_tools.py new file mode 100644 index 0000000000..1210c714d7 --- /dev/null +++ b/fractal_server/zip_tools.py @@ -0,0 +1,110 @@ +import os +import shutil +from io import BytesIO +from pathlib import Path +from typing import Iterator +from typing import TypeVar +from zipfile import ZIP_DEFLATED +from zipfile import ZipFile + +T = TypeVar("T", str, BytesIO) +THRESHOLD_ZIP_FILE_SIZE_MB = 1.0 + + +def _create_zip(folder: str, output: T) -> T: + """ + Zip a folder into a zip-file or into a BytesIO. + + Args: + folder: Folder to be zipped. + output: Either a string with the path of the zip file, or a BytesIO + object. + + Returns: + Either the zip-file path string, or the modified BytesIO object. + """ + if isinstance(output, str) and os.path.exists(output): + raise FileExistsError(f"Zip file '{output}' already exists") + if isinstance(output, BytesIO) and output.getbuffer().nbytes > 0: + raise ValueError("BytesIO is not empty") + + with ZipFile(output, mode="w", compression=ZIP_DEFLATED) as zipfile: + for root, dirs, files in os.walk(folder): + for file in files: + file_path = os.path.join(root, file) + archive_path = os.path.relpath(file_path, folder) + zipfile.write(file_path, archive_path) + return output + + +def _zip_folder_to_byte_stream_iterator(folder: str) -> Iterator: + """ + Returns byte stream with the zipped log folder of a job. + + Args: + folder: the folder to zip + """ + zip_file = Path(f"{folder}.zip") + + if os.path.exists(zip_file): + + def iterfile(): + """ + https://fastapi.tiangolo.com/advanced/custom-response/#using-streamingresponse-with-file-like-objects + """ + with open(zip_file, mode="rb") as file_like: + yield from file_like + + return iterfile() + + else: + + byte_stream = _create_zip(folder, output=BytesIO()) + return iter([byte_stream.getvalue()]) + + +def _folder_can_be_deleted(folder: str) -> bool: + """ + Given the path of a folder as string, returns `False` if either: + - the related zip file `{folder}.zip` does already exists, + - the folder and the zip file have a different number of internal files, + - the zip file has a very small size. + Otherwise returns `True`. + """ + # CHECK 1: zip file exists + zip_file = f"{folder}.zip" + if not os.path.exists(zip_file): + return False + + # CHECK 2: folder and zip file have the same number of files + folder_files_count = sum(1 for f in Path(folder).rglob("*") if f.is_file()) + with ZipFile(zip_file, "r") as zip_ref: + zip_files_count = len(zip_ref.namelist()) + if folder_files_count != zip_files_count: + return False + + # CHECK 3: zip file size is >= than `THRESHOLD_ZIP_FILE_SIZE_MB` + zip_size = os.path.getsize(zip_file) + if zip_size < THRESHOLD_ZIP_FILE_SIZE_MB * 1024 * 1024: + return False + + return True + + +def _zip_folder_to_file_and_remove(folder: str) -> None: + """ + Creates a ZIP archive of the specified folder and removes the original + folder (if it can be deleted). + + This function performs the following steps: + 1. Creates a ZIP archive of the `folder` and names it with a temporary + suffix `_tmp.zip`. + 2. Renames the ZIP removing the suffix (this would possibly overwrite a + file with the same name already present). + 3. Checks if the folder can be safely deleted using the + `_folder_can_be_deleted` function. If so, deletes the original folder. + """ + _create_zip(folder, f"{folder}_tmp.zip") + shutil.move(f"{folder}_tmp.zip", f"{folder}.zip") + if _folder_can_be_deleted(folder): + shutil.rmtree(folder) diff --git a/tests/no_version/test_unit_test_zip_folder_to_byte_stream.py b/tests/no_version/test_unit_test_zip_folder_to_byte_stream.py deleted file mode 100644 index 86dd156f34..0000000000 --- a/tests/no_version/test_unit_test_zip_folder_to_byte_stream.py +++ /dev/null @@ -1,39 +0,0 @@ -from pathlib import Path -from zipfile import ZipFile - -from devtools import debug - -from fractal_server.app.routes.aux._job import _zip_folder_to_byte_stream - - -def test_zip_folder_to_byte_stream(tmp_path: Path): - debug(tmp_path) - - # Prepare file/folder structure - (tmp_path / "file1").touch() - (tmp_path / "file2").touch() - (tmp_path / "folder").mkdir() - (tmp_path / "folder/file3").touch() - (tmp_path / "folder/file4").touch() - - output = _zip_folder_to_byte_stream(folder=tmp_path.as_posix()) - - # Write BytesIO to file - archive_path = tmp_path / "zipped_folder.zip" - with archive_path.open("wb") as f: - f.write(output.getbuffer()) - - # Unzip the log archive - unzipped_archived_path = tmp_path / "unzipped_folder" - unzipped_archived_path.mkdir() - with ZipFile(archive_path.as_posix(), mode="r") as zipfile: - zipfile.extractall(path=unzipped_archived_path.as_posix()) - - # Verify that all expected items are present - glob_list = [file.name for file in unzipped_archived_path.rglob("*")] - debug(glob_list) - assert "file1" in glob_list - assert "file2" in glob_list - assert "folder" in glob_list - assert "file3" in glob_list - assert "file4" in glob_list diff --git a/tests/no_version/test_unit_zip_tools.py b/tests/no_version/test_unit_zip_tools.py new file mode 100644 index 0000000000..f1166e3937 --- /dev/null +++ b/tests/no_version/test_unit_zip_tools.py @@ -0,0 +1,151 @@ +import os +from io import BytesIO +from pathlib import Path +from zipfile import ZipFile + +import pytest + +import fractal_server.zip_tools +from fractal_server.zip_tools import _create_zip +from fractal_server.zip_tools import _folder_can_be_deleted +from fractal_server.zip_tools import _zip_folder_to_byte_stream_iterator +from fractal_server.zip_tools import _zip_folder_to_file_and_remove + + +def make_folder(base: Path) -> Path: + """ + Creates the following repo inside `base` + - test: + - file1 + - subfolder1: + - file2 + - subsubfolder1: + - file3 + - subsubfolder2: + - file4 + - subfolder2: + - file5 + """ + test_folder = base / "test" + test_folder.mkdir() + (test_folder / "subfolder1").mkdir() + (test_folder / "subfolder1/subsubfolder1").mkdir() + (test_folder / "subfolder1/subsubfolder2").mkdir() + (test_folder / "subfolder2").mkdir() + (test_folder / "file1").touch() + (test_folder / "subfolder1/file2").touch() + (test_folder / "subfolder1/subsubfolder1/file3").touch() + (test_folder / "subfolder1/subsubfolder2/file4").touch() + (test_folder / "subfolder2/file5").touch() + return test_folder + + +def test_create_zip(tmp_path): + test_folder = make_folder(tmp_path) + + ret = _create_zip(test_folder, f"{test_folder}.zip") + assert ret == f"{test_folder}.zip" + + with pytest.raises(FileExistsError): + _create_zip(test_folder, f"{test_folder}.zip") + os.unlink(f"{test_folder}.zip") + + with pytest.raises(ValueError): + _create_zip(test_folder, BytesIO(b"foo")) + ret = _create_zip(test_folder, BytesIO()) + assert isinstance(ret, BytesIO) + assert ret.getbuffer().nbytes > 0 + + +def test_zip_folder_to_byte_stream_iterator(tmp_path: Path): + + test_folder = make_folder(tmp_path) + + # Case 1: zip file does not exist yet + output = _zip_folder_to_byte_stream_iterator(folder=test_folder) + + zip_file_1 = Path(tmp_path / "foo.zip") + with zip_file_1.open("wb") as f: + for byte in output: + f.write(byte) + + unzipped_archived_path = tmp_path / "unzipped_folder" + unzipped_archived_path.mkdir() + with ZipFile(zip_file_1.as_posix(), mode="r") as zipfile: + zipfile.extractall(path=unzipped_archived_path.as_posix()) + glob_list = [file.name for file in unzipped_archived_path.rglob("*")] + assert "file1" in glob_list + assert "file2" in glob_list + assert "file3" in glob_list + assert "file4" in glob_list + assert "file5" in glob_list + assert "subfolder1" in glob_list + assert "subfolder2" in glob_list + assert "subsubfolder1" in glob_list + assert "subsubfolder2" in glob_list + + # Case 2: zip file already exists + _create_zip(test_folder, output=f"{test_folder}.zip") + output = _zip_folder_to_byte_stream_iterator(folder=test_folder) + + zip_file_2 = Path(tmp_path / "bar.zip") + with zip_file_2.open("wb") as f: + for byte in output: + f.write(byte) + + unzipped_archived_path_2 = tmp_path / "unzipped_folder2" + unzipped_archived_path_2.mkdir() + with ZipFile(zip_file_2.as_posix(), mode="r") as zipfile: + zipfile.extractall(path=unzipped_archived_path_2.as_posix()) + glob_list = [file.name for file in unzipped_archived_path_2.rglob("*")] + assert "file1" in glob_list + assert "file2" in glob_list + assert "file3" in glob_list + assert "file4" in glob_list + assert "file5" in glob_list + assert "subfolder1" in glob_list + assert "subfolder2" in glob_list + assert "subsubfolder1" in glob_list + assert "subsubfolder2" in glob_list + + +def test_folder_can_be_deleted(tmp_path: Path, monkeypatch): + + test_folder = make_folder(tmp_path) + assert _folder_can_be_deleted(test_folder) is False + + _create_zip(test_folder, output=f"{test_folder}.zip") + assert _folder_can_be_deleted(test_folder) is False + # monkeypatch THRESHOLD_ZIP_FILE_SIZE_MB to make folder deletable + monkeypatch.setattr( + fractal_server.zip_tools, "THRESHOLD_ZIP_FILE_SIZE_MB", 0.0001 + ) + assert _folder_can_be_deleted(test_folder) is True + + os.unlink(test_folder / "file1") + assert _folder_can_be_deleted(test_folder) is False + + +def test_zip_folder_to_file_and_remove(tmp_path: Path, monkeypatch): + + assert os.listdir(tmp_path) == [] + + test_folder = make_folder(tmp_path) + assert os.listdir(tmp_path) == ["test"] + + _zip_folder_to_file_and_remove(test_folder) + assert os.listdir(tmp_path) == ["test", "test.zip"] + + with ZipFile(tmp_path / "test.zip", mode="r") as zipfile: + assert "file1" in zipfile.namelist() + + os.unlink(test_folder / "file1") + # monkeypatch THRESHOLD_ZIP_FILE_SIZE_MB to make folder deletable + monkeypatch.setattr( + fractal_server.zip_tools, "THRESHOLD_ZIP_FILE_SIZE_MB", 0.0001 + ) + _zip_folder_to_file_and_remove(test_folder) + assert os.listdir(tmp_path) == ["test.zip"] + with ZipFile(tmp_path / "test.zip", mode="r") as zipfile: + # `test.zip`` has been overriden by `shutil.move` + assert "file1" not in zipfile.namelist() diff --git a/tests/v2/03_api/test_api_job.py b/tests/v2/03_api/test_api_job.py index 10fa9e58ed..93f77ce635 100644 --- a/tests/v2/03_api/test_api_job.py +++ b/tests/v2/03_api/test_api_job.py @@ -129,22 +129,20 @@ async def test_submit_jobs_with_same_dataset( ) # Existing jobs with done/running status - existing_job_A_done = await job_factory_v2( + await job_factory_v2( project_id=project.id, dataset_id=dataset1.id, workflow_id=workflow.id, working_dir=tmp_path.as_posix(), status="done", ) - debug(existing_job_A_done) - existing_job_B_done = await job_factory_v2( + await job_factory_v2( project_id=project.id, dataset_id=dataset2.id, workflow_id=workflow.id, working_dir=tmp_path.as_posix(), status="submitted", ) - debug(existing_job_B_done) # API call succeeds when the other job with the same dataset has # status="done" @@ -153,9 +151,13 @@ async def test_submit_jobs_with_same_dataset( f"?workflow_id={workflow.id}&dataset_id={dataset1.id}", json={}, ) - debug(res.json()) assert res.status_code == 202 + res = await client.get( + f"{PREFIX}/project/{project.id}/job/{res.json()['id']}/download/" + ) + assert res.status_code == 200 + # API call fails when the other job with the same output_dataset has # status="done" res = await client.post( @@ -163,7 +165,6 @@ async def test_submit_jobs_with_same_dataset( f"?workflow_id={workflow.id}&dataset_id={dataset2.id}", json={}, ) - debug(res.json()) assert res.status_code == 422 assert ( f"Dataset {dataset2.id} is already in use" in res.json()["detail"] diff --git a/tests/v2/08_full_workflow/common_functions.py b/tests/v2/08_full_workflow/common_functions.py index 77362933ae..ead795f835 100644 --- a/tests/v2/08_full_workflow/common_functions.py +++ b/tests/v2/08_full_workflow/common_functions.py @@ -1,6 +1,6 @@ import os import shutil -from glob import glob +import zipfile from pathlib import Path from typing import Optional @@ -162,26 +162,33 @@ async def full_workflow( debug(statuses) assert set(statuses.values()) == {"done"} - # Check files in root job folder + # Check files in zipped root job folder working_dir = job_status_data["working_dir"] + with zipfile.ZipFile(f"{working_dir}.zip", "r") as zip_ref: + actual_files = zip_ref.namelist() expected_files = [ HISTORY_FILENAME, FILTERS_FILENAME, IMAGES_FILENAME, WORKFLOW_LOG_FILENAME, ] - actual_files = os.listdir(working_dir) assert set(expected_files) < set(actual_files) # Check files in task-0 folder expected_files = ["0_par_0000000.log", "0_par_0000001.log"] - actual_files = os.listdir(f"{working_dir}/0_create_ome_zarr_compound") - assert set(expected_files) < set(actual_files) + assert set(expected_files) < set( + file.split("/")[-1] + for file in actual_files + if "0_create_ome_zarr_compound" in file + ) # Check files in task-1 folder expected_files = ["1_par_0000000.log", "1_par_0000001.log"] - actual_files = os.listdir(f"{working_dir}/1_mip_compound") - assert set(expected_files) < set(actual_files) + assert set(expected_files) < set( + file.split("/")[-1] + for file in actual_files + if "1_mip_compound" in file + ) async def full_workflow_TaskExecutionError( @@ -570,9 +577,8 @@ async def workflow_with_non_python_task( # Check that the expected files are present working_dir = job_status_data["working_dir"] - glob_list = [Path(x).name for x in glob(f"{working_dir}/*")] + [ - Path(x).name for x in glob(f"{working_dir}/*/*") - ] + with zipfile.ZipFile(f"{working_dir}.zip", "r") as zip_ref: + glob_list = [name.split("/")[-1] for name in zip_ref.namelist()] must_exist = [ "0.log", @@ -588,7 +594,8 @@ async def workflow_with_non_python_task( raise ValueError(f"{f} must exist, but {glob_list=}") # Check that stderr and stdout are as expected - with open(f"{working_dir}/0_non_python/0.log", "r") as f: - log = f.read() + with zipfile.ZipFile(f"{working_dir}.zip", "r") as zip_ref: + with zip_ref.open("0_non_python/0.log", "r") as file: + log = file.read().decode("utf-8") assert "This goes to standard output" in log assert "This goes to standard error" in log