Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zip job folder after job is complete #1702

Merged
merged 50 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
e689fff
archive job.working_dir
ychiucco Aug 6, 2024
d7f4928
fix type hint
ychiucco Aug 6, 2024
f938f52
zip in fileresponse
ychiucco Aug 7, 2024
9c93055
fix download endpoint
ychiucco Aug 7, 2024
0af75e8
fix test
ychiucco Aug 7, 2024
63fc408
change test using zip archive
ychiucco Aug 7, 2024
5ace5d7
Merge branch 'main' into 1521-zip-job-folder-after-job-is-complete
tcompa Aug 21, 2024
782a43e
Merge branch 'main' into 1521-zip-job-folder-after-job-is-complete
tcompa Aug 21, 2024
1978108
Merge branch 'main' into 1521-zip-job-folder-after-job-is-complete
ychiucco Aug 22, 2024
16c1b7d
create tmp file and rename it
ychiucco Aug 22, 2024
5e9eb63
always return StreamingResponse
ychiucco Aug 22, 2024
883489c
use iterator
ychiucco Aug 22, 2024
6232a3c
move _zip_folder_to_byte_stream to fractal_server/zip_tools.py
ychiucco Aug 22, 2024
82e5fcf
new _zip_folder_to_file function
ychiucco Aug 22, 2024
152f7a4
use updated function
ychiucco Aug 22, 2024
ff4db61
remove type hint [skip ci]
ychiucco Aug 22, 2024
72cb153
fix test
ychiucco Aug 22, 2024
5395d23
Merge branch 'main' into 1521-zip-job-folder-after-job-is-complete
ychiucco Aug 22, 2024
880a83f
add unit test
ychiucco Aug 22, 2024
43da630
add checks
ychiucco Aug 23, 2024
ca97008
improve unittest structure
ychiucco Aug 23, 2024
641493b
avoid repetition [skip ci]
ychiucco Aug 23, 2024
e4ad58a
extract `_create_zip` function
ychiucco Aug 26, 2024
041a825
less lines for folder_files and size
ychiucco Aug 26, 2024
8572b07
rename in _folder_can_be_deleted
ychiucco Aug 26, 2024
caeb13e
check for absolute zip size
ychiucco Aug 26, 2024
2837b64
check number of files, not names
ychiucco Aug 26, 2024
54716ed
rename function
ychiucco Aug 26, 2024
10c6256
split tests
ychiucco Aug 26, 2024
cf212ba
add error message
ychiucco Aug 26, 2024
262c825
db operation in try except
ychiucco Aug 26, 2024
7e11fab
change error
ychiucco Aug 26, 2024
9169989
docstring
ychiucco Aug 26, 2024
81022e6
use and reset logger
ychiucco Aug 27, 2024
a24a4fb
use str(e)
ychiucco Aug 27, 2024
1e3d2eb
Update zip_tools.py
tcompa Aug 27, 2024
b691bf4
add check for non-empty bytesio
ychiucco Aug 27, 2024
acd4f31
check on file count
ychiucco Aug 27, 2024
e3e828d
define THRESHOLD_ZIP_FILE_SIZE_MB const
ychiucco Aug 27, 2024
9d4644f
docstring for `_zip_folder_to_file_and_remove`
ychiucco Aug 27, 2024
f3188c6
fix test with monkeypatch
ychiucco Aug 27, 2024
abecbbc
unittest _create_zip
ychiucco Aug 27, 2024
3f9e666
revamp zip_tools unit tests
ychiucco Aug 27, 2024
345c692
use rglob
ychiucco Aug 27, 2024
36efecc
remove useless if branch
ychiucco Aug 27, 2024
ee29ced
use len
ychiucco Aug 27, 2024
88731c5
changelog and rename response file name
ychiucco Aug 27, 2024
720a67b
Update CHANGELOG.md [skip ci]
tcompa Aug 27, 2024
2310699
Merge branch 'main' into 1521-zip-job-folder-after-job-is-complete
ychiucco Aug 27, 2024
5192d5f
Update CHANGELOG.md [skip ci]
tcompa Aug 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
**Note**: Numbers like (\#1234) point to closed Pull Requests on the fractal-server repository.

# 2.3.8 (Unreleased)
# 2.3.8

> NOTE: `FRACTAL_API_V1_MODE="include_without_submission"` is now transformed
> into `FRACTAL_API_V1_MODE="include_read_only"`.

* API
* API:
* Support read-only mode for V1 (\#1701).
* Improve handling of zipped job-folder in download-logs endpoints (\#1702).
* Runner:
* Improve database-error handling in V2 job execution (\#1702).
* Zip job folder after job execution (\#1702).
* App:
* `UvicornWorker` is now imported from `uvicorn-worker` (\#1690).
* Testing:
Expand Down
5 changes: 2 additions & 3 deletions fractal_server/app/routes/admin/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ....config import get_settings
from ....syringe import Inject
from ....utils import get_timestamp
from ....zip_tools import _zip_folder_to_byte_stream_iterator
from ...db import AsyncSession
from ...db import get_async_db
from ...models.security import UserOAuth as User
Expand All @@ -34,7 +35,6 @@
from ...schemas.v1 import WorkflowReadV1
from ...security import current_active_superuser
from ..aux._job import _write_shutdown_file
from ..aux._job import _zip_folder_to_byte_stream
from ..aux._runner import _check_shutdown_is_supported

router_admin_v1 = APIRouter()
Expand Down Expand Up @@ -387,9 +387,8 @@ async def download_job_logs(
# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
return StreamingResponse(
iter([byte_stream.getvalue()]),
_zip_folder_to_byte_stream_iterator(folder=job.working_dir),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename={zip_filename}"},
)
5 changes: 2 additions & 3 deletions fractal_server/app/routes/admin/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ....config import get_settings
from ....syringe import Inject
from ....utils import get_timestamp
from ....zip_tools import _zip_folder_to_byte_stream_iterator
from ...db import AsyncSession
from ...db import get_async_db
from ...models.security import UserOAuth as User
Expand All @@ -37,7 +38,6 @@
from ...schemas.v2 import ProjectReadV2
from ...security import current_active_superuser
from ..aux._job import _write_shutdown_file
from ..aux._job import _zip_folder_to_byte_stream
from ..aux._runner import _check_shutdown_is_supported

router_admin_v2 = APIRouter()
Expand Down Expand Up @@ -274,9 +274,8 @@ async def download_job_logs(
# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
return StreamingResponse(
iter([byte_stream.getvalue()]),
_zip_folder_to_byte_stream_iterator(folder=job.working_dir),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename={zip_filename}"},
)
Expand Down
5 changes: 2 additions & 3 deletions fractal_server/app/routes/api/v1/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from fastapi.responses import StreamingResponse
from sqlmodel import select

from .....zip_tools import _zip_folder_to_byte_stream_iterator
from ....db import AsyncSession
from ....db import get_async_db
from ....models.v1 import ApplyWorkflow
Expand All @@ -18,7 +19,6 @@
from ....security import current_active_user
from ....security import User
from ...aux._job import _write_shutdown_file
from ...aux._job import _zip_folder_to_byte_stream
from ...aux._runner import _check_shutdown_is_supported
from ._aux_functions import _get_job_check_owner
from ._aux_functions import _get_project_check_owner
Expand Down Expand Up @@ -128,9 +128,8 @@ async def download_job_logs(
# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
return StreamingResponse(
iter([byte_stream.getvalue()]),
_zip_folder_to_byte_stream_iterator(folder=job.working_dir),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename={zip_filename}"},
)
Expand Down
14 changes: 5 additions & 9 deletions fractal_server/app/routes/api/v2/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from fastapi.responses import StreamingResponse
from sqlmodel import select

from .....zip_tools import _zip_folder_to_byte_stream_iterator
from ....db import AsyncSession
from ....db import get_async_db
from ....models.v2 import JobV2
Expand All @@ -18,7 +19,6 @@
from ....security import current_active_user
from ....security import User
from ...aux._job import _write_shutdown_file
from ...aux._job import _zip_folder_to_byte_stream
from ...aux._runner import _check_shutdown_is_supported
from ._aux_functions import _get_job_check_owner
from ._aux_functions import _get_project_check_owner
Expand Down Expand Up @@ -118,7 +118,7 @@ async def download_job_logs(
db: AsyncSession = Depends(get_async_db),
) -> StreamingResponse:
"""
Download job folder
Download zipped job folder
"""
output = await _get_job_check_owner(
project_id=project_id,
Expand All @@ -127,15 +127,11 @@ async def download_job_logs(
db=db,
)
job = output["job"]

# Create and return byte stream for zipped log folder
PREFIX_ZIP = Path(job.working_dir).name
zip_filename = f"{PREFIX_ZIP}_archive.zip"
byte_stream = _zip_folder_to_byte_stream(folder=job.working_dir)
zip_name = f"{Path(job.working_dir).name}_archive.zip"
return StreamingResponse(
iter([byte_stream.getvalue()]),
_zip_folder_to_byte_stream_iterator(folder=job.working_dir),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename={zip_filename}"},
headers={"Content-Disposition": f"attachment;filename={zip_name}"},
)


Expand Down
23 changes: 0 additions & 23 deletions fractal_server/app/routes/aux/_job.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import os
from io import BytesIO
from pathlib import Path
from typing import Union
from zipfile import ZIP_DEFLATED
from zipfile import ZipFile

from ...models.v1 import ApplyWorkflow
from ...models.v2 import JobV2
Expand All @@ -24,22 +20,3 @@ def _write_shutdown_file(*, job: Union[ApplyWorkflow, JobV2]):
shutdown_file = Path(job.working_dir) / SHUTDOWN_FILENAME
with shutdown_file.open("w") as f:
f.write(f"Trigger executor shutdown for {job.id=}.")


def _zip_folder_to_byte_stream(*, folder: str) -> BytesIO:
"""
Get byte stream with the zipped log folder of a job.

Args:
folder: the folder to zip
"""

byte_stream = BytesIO()
with ZipFile(byte_stream, mode="w", compression=ZIP_DEFLATED) as zipfile:
for root, dirs, files in os.walk(folder):
for file in files:
file_path = os.path.join(root, file)
archive_path = os.path.relpath(file_path, folder)
zipfile.write(file_path, archive_path)

return byte_stream
53 changes: 32 additions & 21 deletions fractal_server/app/runner/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
Other subystems should only import this module and not its submodules or
the individual backends.
"""
import logging
import os
import traceback
from pathlib import Path
Expand All @@ -21,6 +20,7 @@
from ....ssh._fabric import FractalSSH
from ....syringe import Inject
from ....utils import get_timestamp
from ....zip_tools import _zip_folder_to_file_and_remove
from ...db import DB
from ...models.v2 import DatasetV2
from ...models.v2 import JobV2
Expand Down Expand Up @@ -114,9 +114,34 @@ async def submit_workflow(

with next(DB.get_sync_db()) as db_sync:

job: JobV2 = db_sync.get(JobV2, job_id)
if not job:
try:
job: Optional[JobV2] = db_sync.get(JobV2, job_id)
dataset: Optional[DatasetV2] = db_sync.get(DatasetV2, dataset_id)
workflow: Optional[WorkflowV2] = db_sync.get(
WorkflowV2, workflow_id
)
except Exception as e:
logger.error(
f"Error conneting to the database. Original error: {str(e)}"
)
reset_logger_handlers(logger)
return

if job is None:
logger.error(f"JobV2 {job_id} does not exist")
reset_logger_handlers(logger)
return
if dataset is None or workflow is None:
log_msg = ""
if not dataset:
log_msg += f"Cannot fetch dataset {dataset_id} from database\n"
if not workflow:
log_msg += (
f"Cannot fetch workflow {workflow_id} from database\n"
)
fail_job(
db=db_sync, job=job, log_msg=log_msg, logger_name=logger_name
)
return

# Declare runner backend and set `process_workflow` function
Expand All @@ -137,21 +162,6 @@ async def submit_workflow(
)
return

dataset: DatasetV2 = db_sync.get(DatasetV2, dataset_id)
workflow: WorkflowV2 = db_sync.get(WorkflowV2, workflow_id)
if not (dataset and workflow):
log_msg = ""
if not dataset:
log_msg += f"Cannot fetch dataset {dataset_id} from database\n"
if not workflow:
log_msg += (
f"Cannot fetch workflow {workflow_id} from database\n"
)
fail_job(
db=db_sync, job=job, log_msg=log_msg, logger_name=logger_name
)
return

# Define and create server-side working folder
WORKFLOW_DIR_LOCAL = Path(job.working_dir)
if WORKFLOW_DIR_LOCAL.exists():
Expand Down Expand Up @@ -192,9 +202,9 @@ async def submit_workflow(
fractal_ssh.mkdir(
folder=str(WORKFLOW_DIR_REMOTE),
)
logging.info(f"Created {str(WORKFLOW_DIR_REMOTE)} via SSH.")
logger.info(f"Created {str(WORKFLOW_DIR_REMOTE)} via SSH.")
else:
logging.error(
logger.error(
"Invalid FRACTAL_RUNNER_BACKEND="
f"{settings.FRACTAL_RUNNER_BACKEND}."
)
Expand All @@ -219,7 +229,7 @@ async def submit_workflow(
user=slurm_user,
)
else:
logging.info("Skip remote-subfolder creation")
logger.info("Skip remote-subfolder creation")
except Exception as e:
error_type = type(e).__name__
fail_job(
Expand Down Expand Up @@ -448,3 +458,4 @@ async def submit_workflow(
finally:
reset_logger_handlers(logger)
db_sync.close()
_zip_folder_to_file_and_remove(folder=job.working_dir)
110 changes: 110 additions & 0 deletions fractal_server/zip_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import shutil
from io import BytesIO
from pathlib import Path
from typing import Iterator
from typing import TypeVar
from zipfile import ZIP_DEFLATED
from zipfile import ZipFile

T = TypeVar("T", str, BytesIO)
THRESHOLD_ZIP_FILE_SIZE_MB = 1.0


def _create_zip(folder: str, output: T) -> T:
ychiucco marked this conversation as resolved.
Show resolved Hide resolved
"""
Zip a folder into a zip-file or into a BytesIO.

Args:
folder: Folder to be zipped.
output: Either a string with the path of the zip file, or a BytesIO
object.

Returns:
Either the zip-file path string, or the modified BytesIO object.
"""
if isinstance(output, str) and os.path.exists(output):
raise FileExistsError(f"Zip file '{output}' already exists")
if isinstance(output, BytesIO) and output.getbuffer().nbytes > 0:
ychiucco marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("BytesIO is not empty")

with ZipFile(output, mode="w", compression=ZIP_DEFLATED) as zipfile:
for root, dirs, files in os.walk(folder):
for file in files:
file_path = os.path.join(root, file)
archive_path = os.path.relpath(file_path, folder)
zipfile.write(file_path, archive_path)
return output


def _zip_folder_to_byte_stream_iterator(folder: str) -> Iterator:
"""
Returns byte stream with the zipped log folder of a job.

Args:
folder: the folder to zip
"""
zip_file = Path(f"{folder}.zip")

if os.path.exists(zip_file):
ychiucco marked this conversation as resolved.
Show resolved Hide resolved

def iterfile():
"""
https://fastapi.tiangolo.com/advanced/custom-response/#using-streamingresponse-with-file-like-objects
"""
with open(zip_file, mode="rb") as file_like:
yield from file_like

return iterfile()

else:

byte_stream = _create_zip(folder, output=BytesIO())
return iter([byte_stream.getvalue()])


def _folder_can_be_deleted(folder: str) -> bool:
"""
Given the path of a folder as string, returns `False` if either:
- the related zip file `{folder}.zip` does already exists,
- the folder and the zip file have a different number of internal files,
- the zip file has a very small size.
Otherwise returns `True`.
"""
# CHECK 1: zip file exists
zip_file = f"{folder}.zip"
if not os.path.exists(zip_file):
return False

# CHECK 2: folder and zip file have the same number of files
folder_files_count = sum(1 for f in Path(folder).rglob("*") if f.is_file())
with ZipFile(zip_file, "r") as zip_ref:
zip_files_count = len(zip_ref.namelist())
if folder_files_count != zip_files_count:
return False

# CHECK 3: zip file size is >= than `THRESHOLD_ZIP_FILE_SIZE_MB`
zip_size = os.path.getsize(zip_file)
if zip_size < THRESHOLD_ZIP_FILE_SIZE_MB * 1024 * 1024:
return False

return True


def _zip_folder_to_file_and_remove(folder: str) -> None:
"""
Creates a ZIP archive of the specified folder and removes the original
folder (if it can be deleted).

This function performs the following steps:
1. Creates a ZIP archive of the `folder` and names it with a temporary
suffix `_tmp.zip`.
2. Renames the ZIP removing the suffix (this would possibly overwrite a
file with the same name already present).
3. Checks if the folder can be safely deleted using the
`_folder_can_be_deleted` function. If so, deletes the original folder.
"""
_create_zip(folder, f"{folder}_tmp.zip")
shutil.move(f"{folder}_tmp.zip", f"{folder}.zip")
ychiucco marked this conversation as resolved.
Show resolved Hide resolved
if _folder_can_be_deleted(folder):
shutil.rmtree(folder)
ychiucco marked this conversation as resolved.
Show resolved Hide resolved
Loading