Skip to content

Commit

Permalink
feat!: move VFS Logs to under sessionfolder/.vfs_logs (#259)
Browse files Browse the repository at this point in the history
* Moving VFS logs folder and vfs runtime cwd to within session folder.
* Moving the vfs log output folder to live under the session folder at sessiondir/.vfs_log
* Moving new tests out from under skipif windows
* Removing the raise from VFSProcessManager init on windows so we can have at least some windows code coverage, even though the VFS itself doesn't yet work on windows.

Signed-off-by: Brian Axelson <baxelson@amazon.com>
  • Loading branch information
baxeaz authored Mar 28, 2024
1 parent 85f7a68 commit 28e16bb
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 84 deletions.
27 changes: 15 additions & 12 deletions src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@
JobAttachmentsError,
MissingAssetRootError,
)
from .vfs import VFSProcessManager
from .vfs import (
VFSProcessManager,
VFS_CACHE_REL_PATH_IN_SESSION,
VFS_MANIFEST_FOLDER_IN_SESSION,
VFS_LOGS_FOLDER_IN_SESSION,
VFS_MANIFEST_FOLDER_PERMISSIONS,
)

from .models import (
Attachments,
FileConflictResolution,
Expand Down Expand Up @@ -65,15 +72,6 @@
download_logger = getLogger("deadline.job_attachments.download")

S3_DOWNLOAD_MAX_CONCURRENCY = 10
VFS_CACHE_REL_PATH_IN_SESSION = ".vfs_object_cache"
VFS_MERGED_MANIFEST_FOLDER_IN_SESSION = ".vfs_manifests"

VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS = PosixFileSystemPermissionSettings(
os_user="",
os_group="",
dir_mode=0o31,
file_mode=0o64,
)


def get_manifest_from_s3(
Expand Down Expand Up @@ -982,14 +980,19 @@ def mount_vfs_from_manifests(

_set_fs_group([str(asset_cache_hash_path)], str(vfs_cache_dir), fs_permission_settings)

manifest_dir: Path = session_dir / VFS_MERGED_MANIFEST_FOLDER_IN_SESSION
manifest_dir: Path = session_dir / VFS_MANIFEST_FOLDER_IN_SESSION
manifest_dir.mkdir(parents=True, exist_ok=True)
manifest_dir_permissions = VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS
manifest_dir_permissions = VFS_MANIFEST_FOLDER_PERMISSIONS
manifest_dir_permissions.os_user = fs_permission_settings.os_user
manifest_dir_permissions.os_group = fs_permission_settings.os_group

_set_fs_group([str(manifest_dir)], str(manifest_dir), manifest_dir_permissions)

vfs_logs_dir: Path = session_dir / VFS_LOGS_FOLDER_IN_SESSION
vfs_logs_dir.mkdir(parents=True, exist_ok=True)

_set_fs_group([str(vfs_logs_dir)], str(vfs_logs_dir), fs_permission_settings)

for mount_point, manifest in manifests_by_root.items():
# Validate the file paths to see if they are under the given download directory.
_ensure_paths_within_directory(
Expand Down
6 changes: 6 additions & 0 deletions src/deadline/job_attachments/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,9 @@ class UnsupportedHashingAlgorithmError(JobAttachmentsError):
"""
Exception for when an unsupported hashing algorithm is provided.
"""


class VFSRunPathNotSetError(JobAttachmentsError):
"""
Exception for when the run path hasn't been set for the vfs
"""
58 changes: 40 additions & 18 deletions src/deadline/job_attachments/vfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os
import shutil
import subprocess
import sys
import time
from pathlib import Path
import threading
Expand All @@ -14,8 +13,11 @@
VFSExecutableMissingError,
VFSFailedToMountError,
VFSLaunchScriptMissingError,
VFSRunPathNotSetError,
)

from .os_file_permission import PosixFileSystemPermissionSettings

log = logging.getLogger(__name__)

DEADLINE_VFS_ENV_VAR = "DEADLINE_VFS_PATH"
Expand All @@ -26,6 +28,17 @@
DEADLINE_VFS_PID_FILE_NAME = "vfs_pids.txt"
DEADLINE_MANIFEST_GROUP_READ_PERMS = 0o640

VFS_CACHE_REL_PATH_IN_SESSION = ".vfs_object_cache"
VFS_MANIFEST_FOLDER_IN_SESSION = ".vfs_manifests"
VFS_LOGS_FOLDER_IN_SESSION = ".vfs_logs"

VFS_MANIFEST_FOLDER_PERMISSIONS = PosixFileSystemPermissionSettings(
os_user="",
os_group="",
dir_mode=0o31,
file_mode=0o64,
)


class VFSProcessManager(object):
exe_path: Optional[str] = None
Expand Down Expand Up @@ -59,10 +72,6 @@ def __init__(
cas_prefix: Optional[str] = None,
asset_cache_path: Optional[str] = None,
):
# TODO: Once Windows pathmapping is implemented we can remove this
if sys.platform == "win32":
raise NotImplementedError("Windows is not currently supported for Job Attachments")

self._mount_point = mount_point
self._vfs_proc = None
self._vfs_thread = None
Expand Down Expand Up @@ -91,7 +100,7 @@ def kill_all_processes(cls, session_dir: Path, os_user: str) -> None:
for line in file.readlines():
line = line.strip()
mount_point, _, _ = line.split(":")
cls.shutdown_libfuse_mount(mount_point, os_user)
cls.shutdown_libfuse_mount(mount_point, os_user, session_dir)
os.remove(pid_file_path)
except FileNotFoundError:
log.warning(f"VFS pid file not found at {pid_file_path}")
Expand All @@ -110,7 +119,7 @@ def get_shutdown_args(cls, mount_path: str, os_user: str):
return ["sudo", "-u", os_user, fusermount3_path, "-u", mount_path]

@classmethod
def shutdown_libfuse_mount(cls, mount_path: str, os_user: str) -> bool:
def shutdown_libfuse_mount(cls, mount_path: str, os_user: str, session_dir: Path) -> bool:
"""
Shut down the mount at the provided path using the fusermount3 unmount option
as the provided user
Expand All @@ -126,7 +135,7 @@ def shutdown_libfuse_mount(cls, mount_path: str, os_user: str) -> bool:
log.warn(f"Shutdown failed with error {e}")
# Don't reraise, check if mount is gone
log.info(f"Shutdown returns {run_result.returncode}")
return cls.wait_for_mount(mount_path, expected=False)
return cls.wait_for_mount(mount_path, session_dir, expected=False)

@classmethod
def kill_process_at_mount(cls, session_dir: Path, mount_point: str, os_user: str) -> bool:
Expand Down Expand Up @@ -155,7 +164,7 @@ def kill_process_at_mount(cls, session_dir: Path, mount_point: str, os_user: str
else:
mount_for_pid, _, _ = line.split(":")
if mount_for_pid == mount_point:
cls.shutdown_libfuse_mount(mount_point, os_user)
cls.shutdown_libfuse_mount(mount_point, os_user, session_dir)
mount_point_found = True
else:
file.write(line)
Expand Down Expand Up @@ -202,10 +211,12 @@ def is_mount(cls, path) -> bool:
return subprocess.run(["findmnt", path]).returncode == 0

@classmethod
def wait_for_mount(cls, mount_path, mount_wait_seconds=60, expected=True) -> bool:
def wait_for_mount(cls, mount_path, session_dir, mount_wait_seconds=60, expected=True) -> bool:
"""
After we've launched the VFS subprocess we need to wait
for the OS to validate that the mount is in place before use
:param mount_path: Path to mount to watch for
:param session_dir: Session folder associated with mount
:param mount_wait_seconds: Duration to wait for mount state
:param expected: Wait for the mount to exist or no longer exist
"""
Expand All @@ -220,27 +231,38 @@ def wait_for_mount(cls, mount_path, mount_wait_seconds=60, expected=True) -> boo
log.info(f"is_mount on {mount_path} not {expected}, sleeping...")
time.sleep(1)
log.info(f"Failed to find is_mount {expected} at {mount_path} after {mount_wait_seconds}")
VFSProcessManager.print_log_end()
cls.print_log_end(session_dir)
return False

@classmethod
def get_logs_folder(cls) -> Union[os.PathLike, str]:
def logs_folder_path(cls, session_dir: Path) -> Union[os.PathLike, str]:
"""
Find the folder we expect VFS logs to be written to
"""
return os.path.join(os.path.dirname(VFSProcessManager.find_vfs()), "..", "logs")
return session_dir / VFS_LOGS_FOLDER_IN_SESSION

def get_logs_folder(self) -> Union[os.PathLike, str]:
"""
Find the folder we expect VFS logs to be written to
"""
if self._run_path:
return self.logs_folder_path(Path(self._run_path))
raise VFSRunPathNotSetError("Attempted to find logs folder without run path")

@classmethod
def print_log_end(cls, log_file_name="vfs_log.txt", lines=100, log_level=logging.WARNING):
def print_log_end(
self, session_dir: Path, log_file_name="vfs_log.txt", lines=100, log_level=logging.WARNING
):
"""
Print out the end of our VFS Log. Reads the full log file into memory. Our VFS logs are size
capped so this is not an issue for the intended use case.
:param session_dir: Session folder for mount
:param log_file_name: Name of file within the logs folder to read from. Defaults to vfs_log.txt which
is our "most recent" log file.
:param lines: Maximum number of lines from the end of the log to print
:param log_level: Level to print logging as
"""
log_file_path = os.path.join(VFSProcessManager.get_logs_folder(), log_file_name)
log_file_path = self.logs_folder_path(session_dir) / log_file_name
log.log(log_level, f"Printing last {lines} lines from {log_file_path}")
if not os.path.exists(log_file_path):
log.warning(f"No log file found at {log_file_path}")
Expand Down Expand Up @@ -432,7 +454,7 @@ def start(self, session_dir: Path) -> None:
Start our VFS process
:return: VFS process id
"""
self._run_path = VFSProcessManager.get_cwd()
self._run_path = session_dir
log.info(f"Using run_path {self._run_path}")
log.info(f"Using mount_point {self._mount_point}")
self.set_manifest_owner()
Expand All @@ -457,7 +479,7 @@ def read_output_thread(pipe, log):
args=start_command,
stdout=subprocess.PIPE, # Create a new pipe
stderr=subprocess.STDOUT, # Merge stderr into the stdout pipe
cwd=self._run_path,
cwd=str(self._run_path),
env=launch_env,
shell=True,
executable="/bin/bash",
Expand All @@ -472,7 +494,7 @@ def read_output_thread(pipe, log):
log.exception(f"Exception during launch with command {start_command} exception {e}")
raise e
log.info(f"Launched VFS as pid {self._vfs_proc.pid}")
if not VFSProcessManager.wait_for_mount(self.get_mount_point()):
if not VFSProcessManager.wait_for_mount(self.get_mount_point(), session_dir):
log.error("Failed to mount, shutting down")
raise VFSFailedToMountError

Expand Down
14 changes: 9 additions & 5 deletions test/unit/deadline_job_attachments/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@
_get_asset_root_from_s3,
_get_tasks_manifests_keys_from_s3,
VFS_CACHE_REL_PATH_IN_SESSION,
VFS_MERGED_MANIFEST_FOLDER_IN_SESSION,
VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS,
VFS_MANIFEST_FOLDER_IN_SESSION,
VFS_MANIFEST_FOLDER_PERMISSIONS,
VFS_LOGS_FOLDER_IN_SESSION,
)
from deadline.job_attachments.exceptions import (
AssetSyncError,
Expand Down Expand Up @@ -2337,12 +2338,14 @@ def test_mount_vfs_from_manifests(
manifest_permissions = PosixFileSystemPermissionSettings(
fs_permissions.os_user,
fs_permissions.os_group,
VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS.dir_mode,
VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS.file_mode,
VFS_MANIFEST_FOLDER_PERMISSIONS.dir_mode,
VFS_MANIFEST_FOLDER_PERMISSIONS.file_mode,
)

cache_path = temp_dir_path / VFS_CACHE_REL_PATH_IN_SESSION
manifest_path = temp_dir_path / VFS_MERGED_MANIFEST_FOLDER_IN_SESSION
manifest_path = temp_dir_path / VFS_MANIFEST_FOLDER_IN_SESSION
logs_path = temp_dir_path / VFS_LOGS_FOLDER_IN_SESSION

with patch(
f"{deadline.__package__}.job_attachments.download._set_fs_group",
) as mock_set_vs_group, patch(
Expand Down Expand Up @@ -2372,6 +2375,7 @@ def test_mount_vfs_from_manifests(
[
call([str(cache_path / "cas/test")], str(cache_path), fs_permissions),
call([str(manifest_path)], str(manifest_path), manifest_permissions),
call([str(logs_path)], str(logs_path), fs_permissions),
]
)

Expand Down
Loading

0 comments on commit 28e16bb

Please sign in to comment.