Skip to content

Commit

Permalink
Merge pull request #1702 from fractal-analytics-platform/1521-zip-job…
Browse files Browse the repository at this point in the history
…-folder-after-job-is-complete

Zip job folder after job is complete
  • Loading branch information
tcompa committed Aug 27, 2024
2 parents da063f4 + 5192d5f commit 096883f
Show file tree
Hide file tree
Showing 12 changed files with 336 additions and 121 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
**Note**: Numbers like (\#1234) point to closed Pull Requests on the fractal-server repository.

# 2.3.8 (Unreleased)
# 2.3.8

> NOTE: `FRACTAL_API_V1_MODE="include_without_submission"` is now transformed
> into `FRACTAL_API_V1_MODE="include_read_only"`.
* API
* API:
* Support read-only mode for V1 (\#1701).
* Improve handling of zipped job-folder in download-logs endpoints (\#1702).
* Runner:
* Improve database-error handling in V2 job execution (\#1702).
* Zip job folder after job execution (\#1702).
* App:
* `UvicornWorker` is now imported from `uvicorn-worker` (\#1690).
* Testing:
Expand Down
5 changes: 2 additions & 3 deletions fractal_server/app/routes/admin/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ....config import get_settings
from ....syringe import Inject
from ....utils import get_timestamp
from ....zip_tools import _zip_folder_to_byte_stream_iterator
from ...db import AsyncSession
from ...db import get_async_db
from ...models.security import UserOAuth as User
Expand All @@ -34,7 +35,6 @@
from ...schemas.v1 import WorkflowReadV1
from ...security import current_active_superuser
from ..aux._job import _write_shutdown_file
from ..aux._job import _zip_folder_to_byte_stream
from ..aux._runner import _check_shutdown_is_supported

router_admin_v1 = APIRouter()
Expand Down Expand Up @@ -387,9 +387,8 @@ async def download_job_logs(
# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
return StreamingResponse(
iter([byte_stream.getvalue()]),
_zip_folder_to_byte_stream_iterator(folder=job.working_dir),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename={zip_filename}"},
)
5 changes: 2 additions & 3 deletions fractal_server/app/routes/admin/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ....config import get_settings
from ....syringe import Inject
from ....utils import get_timestamp
from ....zip_tools import _zip_folder_to_byte_stream_iterator
from ...db import AsyncSession
from ...db import get_async_db
from ...models.security import UserOAuth as User
Expand All @@ -37,7 +38,6 @@
from ...schemas.v2 import ProjectReadV2
from ...security import current_active_superuser
from ..aux._job import _write_shutdown_file
from ..aux._job import _zip_folder_to_byte_stream
from ..aux._runner import _check_shutdown_is_supported

router_admin_v2 = APIRouter()
Expand Down Expand Up @@ -274,9 +274,8 @@ async def download_job_logs(
# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
return StreamingResponse(
iter([byte_stream.getvalue()]),
_zip_folder_to_byte_stream_iterator(folder=job.working_dir),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename={zip_filename}"},
)
Expand Down
5 changes: 2 additions & 3 deletions fractal_server/app/routes/api/v1/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from fastapi.responses import StreamingResponse
from sqlmodel import select

from .....zip_tools import _zip_folder_to_byte_stream_iterator
from ....db import AsyncSession
from ....db import get_async_db
from ....models.v1 import ApplyWorkflow
Expand All @@ -18,7 +19,6 @@
from ....security import current_active_user
from ....security import User
from ...aux._job import _write_shutdown_file
from ...aux._job import _zip_folder_to_byte_stream
from ...aux._runner import _check_shutdown_is_supported
from ._aux_functions import _get_job_check_owner
from ._aux_functions import _get_project_check_owner
Expand Down Expand Up @@ -128,9 +128,8 @@ async def download_job_logs(
# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
return StreamingResponse(
iter([byte_stream.getvalue()]),
_zip_folder_to_byte_stream_iterator(folder=job.working_dir),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename={zip_filename}"},
)
Expand Down
14 changes: 5 additions & 9 deletions fractal_server/app/routes/api/v2/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from fastapi.responses import StreamingResponse
from sqlmodel import select

from .....zip_tools import _zip_folder_to_byte_stream_iterator
from ....db import AsyncSession
from ....db import get_async_db
from ....models.v2 import JobV2
Expand All @@ -18,7 +19,6 @@
from ....security import current_active_user
from ....security import User
from ...aux._job import _write_shutdown_file
from ...aux._job import _zip_folder_to_byte_stream
from ...aux._runner import _check_shutdown_is_supported
from ._aux_functions import _get_job_check_owner
from ._aux_functions import _get_project_check_owner
Expand Down Expand Up @@ -118,7 +118,7 @@ async def download_job_logs(
db: AsyncSession = Depends(get_async_db),
) -> StreamingResponse:
"""
Download job folder
Download zipped job folder
"""
output = await _get_job_check_owner(
project_id=project_id,
Expand All @@ -127,15 +127,11 @@ async def download_job_logs(
db=db,
)
job = output["job"]

# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
zip_name = f"{Path(job.working_dir).name}_archive.zip"
return StreamingResponse(
iter([byte_stream.getvalue()]),
_zip_folder_to_byte_stream_iterator(folder=job.working_dir),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename={zip_filename}"},
headers={"Content-Disposition": f"attachment;filename={zip_name}"},
)


Expand Down
23 changes: 0 additions & 23 deletions fractal_server/app/routes/aux/_job.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import os
from io import BytesIO
from pathlib import Path
from typing import Union
from zipfile import ZIP_DEFLATED
from zipfile import ZipFile

from ...models.v1 import ApplyWorkflow
from ...models.v2 import JobV2
Expand All @@ -24,22 +20,3 @@ def _write_shutdown_file(*, job: Union[ApplyWorkflow, JobV2]):
shutdown_file = Path(job.working_dir) / SHUTDOWN_FILENAME
with shutdown_file.open("w") as f:
f.write(f"Trigger executor shutdown for {job.id=}.")


def _zip_folder_to_byte_stream(*, folder: str) -> BytesIO:
"""
Get byte stream with the zipped log folder of a job.
Args:
folder: the folder to zip
"""

byte_stream = BytesIO()
with ZipFile(byte_stream, mode="w", compression=ZIP_DEFLATED) as zipfile:
for root, dirs, files in os.walk(folder):
for file in files:
file_path = os.path.join(root, file)
archive_path = os.path.relpath(file_path, folder)
zipfile.write(file_path, archive_path)

return byte_stream
53 changes: 32 additions & 21 deletions fractal_server/app/runner/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
Other subystems should only import this module and not its submodules or
the individual backends.
"""
import logging
import os
import traceback
from pathlib import Path
Expand All @@ -21,6 +20,7 @@
from ....ssh._fabric import FractalSSH
from ....syringe import Inject
from ....utils import get_timestamp
from ....zip_tools import _zip_folder_to_file_and_remove
from ...db import DB
from ...models.v2 import DatasetV2
from ...models.v2 import JobV2
Expand Down Expand Up @@ -114,9 +114,34 @@ async def submit_workflow(

with next(DB.get_sync_db()) as db_sync:

job: JobV2 = db_sync.get(JobV2, job_id)
if not job:
try:
job: Optional[JobV2] = db_sync.get(JobV2, job_id)
dataset: Optional[DatasetV2] = db_sync.get(DatasetV2, dataset_id)
workflow: Optional[WorkflowV2] = db_sync.get(
WorkflowV2, workflow_id
)
except Exception as e:
logger.error(
f"Error conneting to the database. Original error: {str(e)}"
)
reset_logger_handlers(logger)
return

if job is None:
logger.error(f"JobV2 {job_id} does not exist")
reset_logger_handlers(logger)
return
if dataset is None or workflow is None:
log_msg = ""
if not dataset:
log_msg += f"Cannot fetch dataset {dataset_id} from database\n"
if not workflow:
log_msg += (
f"Cannot fetch workflow {workflow_id} from database\n"
)
fail_job(
db=db_sync, job=job, log_msg=log_msg, logger_name=logger_name
)
return

# Declare runner backend and set `process_workflow` function
Expand All @@ -137,21 +162,6 @@ async def submit_workflow(
)
return

dataset: DatasetV2 = db_sync.get(DatasetV2, dataset_id)
workflow: WorkflowV2 = db_sync.get(WorkflowV2, workflow_id)
if not (dataset and workflow):
log_msg = ""
if not dataset:
log_msg += f"Cannot fetch dataset {dataset_id} from database\n"
if not workflow:
log_msg += (
f"Cannot fetch workflow {workflow_id} from database\n"
)
fail_job(
db=db_sync, job=job, log_msg=log_msg, logger_name=logger_name
)
return

# Define and create server-side working folder
WORKFLOW_DIR_LOCAL = Path(job.working_dir)
if WORKFLOW_DIR_LOCAL.exists():
Expand Down Expand Up @@ -192,9 +202,9 @@ async def submit_workflow(
fractal_ssh.mkdir(
folder=str(WORKFLOW_DIR_REMOTE),
)
logging.info(f"Created {str(WORKFLOW_DIR_REMOTE)} via SSH.")
logger.info(f"Created {str(WORKFLOW_DIR_REMOTE)} via SSH.")
else:
logging.error(
logger.error(
"Invalid FRACTAL_RUNNER_BACKEND="
f"{settings.FRACTAL_RUNNER_BACKEND}."
)
Expand All @@ -219,7 +229,7 @@ async def submit_workflow(
user=slurm_user,
)
else:
logging.info("Skip remote-subfolder creation")
logger.info("Skip remote-subfolder creation")
except Exception as e:
error_type = type(e).__name__
fail_job(
Expand Down Expand Up @@ -448,3 +458,4 @@ async def submit_workflow(
finally:
reset_logger_handlers(logger)
db_sync.close()
_zip_folder_to_file_and_remove(folder=job.working_dir)
110 changes: 110 additions & 0 deletions fractal_server/zip_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import shutil
from io import BytesIO
from pathlib import Path
from typing import Iterator
from typing import TypeVar
from zipfile import ZIP_DEFLATED
from zipfile import ZipFile

T = TypeVar("T", str, BytesIO)
THRESHOLD_ZIP_FILE_SIZE_MB = 1.0


def _create_zip(folder: str, output: T) -> T:
"""
Zip a folder into a zip-file or into a BytesIO.
Args:
folder: Folder to be zipped.
output: Either a string with the path of the zip file, or a BytesIO
object.
Returns:
Either the zip-file path string, or the modified BytesIO object.
"""
if isinstance(output, str) and os.path.exists(output):
raise FileExistsError(f"Zip file '{output}' already exists")
if isinstance(output, BytesIO) and output.getbuffer().nbytes > 0:
raise ValueError("BytesIO is not empty")

with ZipFile(output, mode="w", compression=ZIP_DEFLATED) as zipfile:
for root, dirs, files in os.walk(folder):
for file in files:
file_path = os.path.join(root, file)
archive_path = os.path.relpath(file_path, folder)
zipfile.write(file_path, archive_path)
return output


def _zip_folder_to_byte_stream_iterator(folder: str) -> Iterator:
"""
Returns byte stream with the zipped log folder of a job.
Args:
folder: the folder to zip
"""
zip_file = Path(f"{folder}.zip")

if os.path.exists(zip_file):

def iterfile():
"""
https://fastapi.tiangolo.com/advanced/custom-response/#using-streamingresponse-with-file-like-objects
"""
with open(zip_file, mode="rb") as file_like:
yield from file_like

return iterfile()

else:

byte_stream = _create_zip(folder, output=BytesIO())
return iter([byte_stream.getvalue()])


def _folder_can_be_deleted(folder: str) -> bool:
"""
Given the path of a folder as string, returns `False` if either:
- the related zip file `{folder}.zip` does already exists,
- the folder and the zip file have a different number of internal files,
- the zip file has a very small size.
Otherwise returns `True`.
"""
# CHECK 1: zip file exists
zip_file = f"{folder}.zip"
if not os.path.exists(zip_file):
return False

# CHECK 2: folder and zip file have the same number of files
folder_files_count = sum(1 for f in Path(folder).rglob("*") if f.is_file())
with ZipFile(zip_file, "r") as zip_ref:
zip_files_count = len(zip_ref.namelist())
if folder_files_count != zip_files_count:
return False

# CHECK 3: zip file size is >= than `THRESHOLD_ZIP_FILE_SIZE_MB`
zip_size = os.path.getsize(zip_file)
if zip_size < THRESHOLD_ZIP_FILE_SIZE_MB * 1024 * 1024:
return False

return True


def _zip_folder_to_file_and_remove(folder: str) -> None:
"""
Creates a ZIP archive of the specified folder and removes the original
folder (if it can be deleted).
This function performs the following steps:
1. Creates a ZIP archive of the `folder` and names it with a temporary
suffix `_tmp.zip`.
2. Renames the ZIP removing the suffix (this would possibly overwrite a
file with the same name already present).
3. Checks if the folder can be safely deleted using the
`_folder_can_be_deleted` function. If so, deletes the original folder.
"""
_create_zip(folder, f"{folder}_tmp.zip")
shutil.move(f"{folder}_tmp.zip", f"{folder}.zip")
if _folder_can_be_deleted(folder):
shutil.rmtree(folder)
Loading

0 comments on commit 096883f

Please sign in to comment.